Flink BucketingSink与NoClassDefFoundError崩溃:Lorg / apache / hadoop / fs / FileSystem

问题描述 投票:2回答:1

flink的版本尝试:1.4.0,1.4.1,1.4.2

当我尝试制作这个简单的flink应用程序时

val env: StreamExecutionEnvironment = 
  StreamExecutionEnvironment.getExecutionEnvironment


env.fromElements("a", "b", "c").addSink(new BucketingSink[String]("file:///Users/joshlemer/projects/my-project/target/output"))  

我得到以下运行时异常:

Exception in thread "main" java.lang.NoClassDefFoundError: Lorg/apache/hadoop/fs/FileSystem;
    at java.lang.Class.getDeclaredFields0(Native Method)
    at java.lang.Class.privateGetDeclaredFields(Class.java:2583)
    at java.lang.Class.getDeclaredFields(Class.java:1916)
    at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:72)
    at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1550)
    at org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:184)
    at org.apache.flink.streaming.api.datastream.DataStream.addSink(DataStream.java:1134)
    at org.apache.flink.streaming.api.scala.DataStream.addSink(DataStream.scala:1036)
    at com.company.project.Job$.run(Job.scala:52)
    at com.company.project.Job$.main(Job.scala:28)
    at com.company.project.Job.main(Job.scala)
Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.fs.FileSystem
    at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:335)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357)  

即使我可以使用dataStream.writeAsText(...)写入文本文件。

我的build.sbt也非常典型:

val flinkVersion = "1.4.2"

val flinkDependencies =
  Seq(
    "org.apache.flink" %% "flink-scala" % flinkVersion % "provided",
    "org.apache.flink" %% "flink-streaming-scala" % flinkVersion % "provided",
    "org.apache.flink" %% "flink-statebackend-rocksdb" % flinkVersion,
    "org.apache.flink" %% "flink-connector-kafka-0.11" % flinkVersion,
    "org.apache.flink" %% "flink-connector-filesystem" % flinkVersion,
    "org.apache.flink" %% "flink-test-utils" % flinkVersion % "test",
    "org.apache.flink" % "flink-test-utils-junit" % flinkVersion % "test"
  )

另外还有idea.sbt作为Flink为IntelliJ用户推荐的

lazy val mainRunner = project.in(file("mainRunner")).dependsOn(RootProject(file("."))).settings(
  // we set all provided dependencies to none, so that they are included in the classpath of mainRunner
  libraryDependencies := (libraryDependencies in RootProject(file("."))).value.map{
    module =>
      if (module.configurations.equals(Some("provided"))) {
        module.copy(configurations = None)
      } else {
        module
      }
  }
)

这是我用来运行应用程序(mainRunner被设置为应用程序classPath)。

我很困惑,为什么会发生这种情况,特别是为什么包装以“Lorg”而不是“org”开头?

谢谢!

java scala apache-flink
1个回答
2
投票

来自1.4 release notes

从版本1.4开始,Flink可以在Classpath中不存在任何Hadoop依赖项的情况下运行。除了简单地在没有Hadoop的情况下运行,这使Flink能够动态使用类路径中可用的任何Hadoop版本。

例如,您可以下载Flink的免费Hadoop版本,但使用它在任何受支持的YARN版本上运行,Flink将动态使用YARN的Hadoop依赖项。

这也意味着在使用连接器到HDFS的情况下,例如BucketingSink或RollingSink,您现在必须确保使用带有捆绑Hadoop依赖关系的Flink分发,或者确保在为您构建jar文件时包含Hadoop依赖关系应用。

© www.soinside.com 2019 - 2024. All rights reserved.