在应用程序模式下遇到类加载问题

问题描述 投票:0回答:1

我正在将 Yarn 集群上的 Flink 作业的部署模式从会话模式更改为应用程序模式。我有很多相同实施的工作,除了其中两个之外,所有工作都运行良好。

在会话模式下,我的所有罐子都运行良好。

在应用程序模式下,我遇到

java.lang.NoClassDefFoundError: Could not initialize class
错误:

2024-04-17 10:06:16,055 ERROR com.company.project.jobs.MyClassName$                 [] -  ==>> java.lang.NoClassDefFoundError: Could not initialize class com.company.project.avro.POJOname
    sun.misc.Unsafe.ensureClassInitialized(Native Method)
    sun.reflect.UnsafeFieldAccessorFactory.newFieldAccessor(UnsafeFieldAccessorFactory.java:43)
    sun.reflect.ReflectionFactory.newFieldAccessor(ReflectionFactory.java:156)
    java.lang.reflect.Field.acquireFieldAccessor(Field.java:1088)
    java.lang.reflect.Field.getFieldAccessor(Field.java:1069)
    java.lang.reflect.Field.get(Field.java:393)
    org.apache.flink.formats.avro.typeutils.AvroFactory.getSpecificDataForClass(AvroFactory.java:166)
    org.apache.flink.formats.avro.AvroDeserializationSchema.checkAvroInitialized(AvroDeserializationSchema.java:154)
    org.apache.flink.formats.avro.AvroDeserializationSchema.deserialize(AvroDeserializationSchema.java:135)
    com.company.project.jobs.MyClassName$$anonfun$transformStream$1.apply(MyClassName.scala:47)
    com.company.project.jobs.MyClassName$$anonfun$transformStream$1.apply(MyClassName.scala:31)
    org.apache.flink.streaming.api.scala.DataStream$$anon$4.map(DataStream.scala:648)
    org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:38)
    org.apache.flink.streaming.runtime.tasks.ChainingOutput.pushToOperator(ChainingOutput.java:99)
    org.apache.flink.streaming.runtime.tasks.ChainingOutput.collect(ChainingOutput.java:80)
    org.apache.flink.streaming.runtime.tasks.ChainingOutput.collect(ChainingOutput.java:39)
    org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask$AsyncDataOutputToOutput.emitRecord(SourceOperatorStreamTask.java:196)
    org.apache.flink.streaming.api.operators.source.SourceOutputWithWatermarks.collect(SourceOutputWithWatermarks.java:110)
    org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter$SourceOutputWrapper.collect(KafkaRecordEmitter.java:65)
    org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:84)
    org.apache.flink.connector.kafka.source.reader.deserializer.KafkaValueOnlyDeserializationSchemaWrapper.deserialize(KafkaValueOnlyDeserializationSchemaWrapper.java:51)
    org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:51)
    org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:32)
    org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:143)
    org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:354)
    org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68)
    org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
    org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:495)
    org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203)
    org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:806)
    org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:758)
    org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
    org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937)
    org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766)
    org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
    java.lang.Thread.run(Thread.java:748)

Flink:1.14.6 语言:Scala

像这样运行:

"${FLINK_HOME:?}"/bin/flink run-application -d \
      -t yarn-application \
      -Dclassloader.resolve-order=parent-first \
      -Dyarn.application.name="${1^^}" \
      -Dyarn.application.queue=root.queue.name \
      -Dtaskmanager.numberOfTaskSlots="$slots" \
      -Dtaskmanager.memory.process.size="${mem}m"\
      -Dyarn.ship-files="${HOME}/conf/${1,,}.properties" \
      -p "${parallelism:?}" \
      hdfs://nNode/.../lib/prefix-${context,,}-${jar_version}.jar \
        --job_props_path "${1,,}.properties"

Avro lib 通常位于 fat jar 中 (

"org.apache.flink" % "flink-avro" % flinkVersion
)

  • 我做错了什么?
  • 为什么其他作业都运行良好,而这两个作业却不然?

任何帮助将不胜感激。

scala apache-flink
1个回答
0
投票

我想通了。我的问题是图书馆。我在运行命令中添加了一个conf

yarn.provided.lib.dirs
,并清理了
lib/
文件夹中的库,只留下默认库。

...
-Dyarn.provided.lib.dirs="hdfs://nNode/user/doe/flink14/lib"
...

如果没有这个 Yarn 选项,flink 使用本地 lib/ 文件夹,其中包含导致问题的 Hive 库。

hive-exec-1.1.0.jar
hive-metastore-1.1.0.jar
flink-connector-hive_2.11-1.14.6.jar
libfb303-0.9.2.jar
© www.soinside.com 2019 - 2024. All rights reserved.