我正在将 Yarn 集群上的 Flink 作业的部署模式从会话模式更改为应用程序模式。我有很多相同实施的工作,除了其中两个之外,所有工作都运行良好。
在会话模式下,我的所有罐子都运行良好。
在应用程序模式下,我遇到
java.lang.NoClassDefFoundError: Could not initialize class
错误:
2024-04-17 10:06:16,055 ERROR com.company.project.jobs.MyClassName$ [] - ==>> java.lang.NoClassDefFoundError: Could not initialize class com.company.project.avro.POJOname
sun.misc.Unsafe.ensureClassInitialized(Native Method)
sun.reflect.UnsafeFieldAccessorFactory.newFieldAccessor(UnsafeFieldAccessorFactory.java:43)
sun.reflect.ReflectionFactory.newFieldAccessor(ReflectionFactory.java:156)
java.lang.reflect.Field.acquireFieldAccessor(Field.java:1088)
java.lang.reflect.Field.getFieldAccessor(Field.java:1069)
java.lang.reflect.Field.get(Field.java:393)
org.apache.flink.formats.avro.typeutils.AvroFactory.getSpecificDataForClass(AvroFactory.java:166)
org.apache.flink.formats.avro.AvroDeserializationSchema.checkAvroInitialized(AvroDeserializationSchema.java:154)
org.apache.flink.formats.avro.AvroDeserializationSchema.deserialize(AvroDeserializationSchema.java:135)
com.company.project.jobs.MyClassName$$anonfun$transformStream$1.apply(MyClassName.scala:47)
com.company.project.jobs.MyClassName$$anonfun$transformStream$1.apply(MyClassName.scala:31)
org.apache.flink.streaming.api.scala.DataStream$$anon$4.map(DataStream.scala:648)
org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:38)
org.apache.flink.streaming.runtime.tasks.ChainingOutput.pushToOperator(ChainingOutput.java:99)
org.apache.flink.streaming.runtime.tasks.ChainingOutput.collect(ChainingOutput.java:80)
org.apache.flink.streaming.runtime.tasks.ChainingOutput.collect(ChainingOutput.java:39)
org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask$AsyncDataOutputToOutput.emitRecord(SourceOperatorStreamTask.java:196)
org.apache.flink.streaming.api.operators.source.SourceOutputWithWatermarks.collect(SourceOutputWithWatermarks.java:110)
org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter$SourceOutputWrapper.collect(KafkaRecordEmitter.java:65)
org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:84)
org.apache.flink.connector.kafka.source.reader.deserializer.KafkaValueOnlyDeserializationSchemaWrapper.deserialize(KafkaValueOnlyDeserializationSchemaWrapper.java:51)
org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:51)
org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:32)
org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:143)
org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:354)
org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68)
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:495)
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203)
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:806)
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:758)
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937)
org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766)
org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
java.lang.Thread.run(Thread.java:748)
Flink:1.14.6 语言:Scala
像这样运行:
"${FLINK_HOME:?}"/bin/flink run-application -d \
-t yarn-application \
-Dclassloader.resolve-order=parent-first \
-Dyarn.application.name="${1^^}" \
-Dyarn.application.queue=root.queue.name \
-Dtaskmanager.numberOfTaskSlots="$slots" \
-Dtaskmanager.memory.process.size="${mem}m"\
-Dyarn.ship-files="${HOME}/conf/${1,,}.properties" \
-p "${parallelism:?}" \
hdfs://nNode/.../lib/prefix-${context,,}-${jar_version}.jar \
--job_props_path "${1,,}.properties"
Avro lib 通常位于 fat jar 中 (
"org.apache.flink" % "flink-avro" % flinkVersion
)
任何帮助将不胜感激。
我想通了。我的问题是图书馆。我在运行命令中添加了一个conf
yarn.provided.lib.dirs
,并清理了lib/
文件夹中的库,只留下默认库。
...
-Dyarn.provided.lib.dirs="hdfs://nNode/user/doe/flink14/lib"
...
如果没有这个 Yarn 选项,flink 使用本地 lib/ 文件夹,其中包含导致问题的 Hive 库。
hive-exec-1.1.0.jar
hive-metastore-1.1.0.jar
flink-connector-hive_2.11-1.14.6.jar
libfb303-0.9.2.jar