通过PySpark在Kafka-Spark结构化流集成时遇到NoClassDefFoundError错误

问题描述 投票:0回答:1

我正在使用

  • 火花版本:3.0.0-preview2
  • Scala版本:2.12
  • JAVA版本:1.8
  • Kafka Broker版本:2.2.0

我已经在spark-defaults上配置了两个JARS(spark-sql-kafka-0-10_2.12-3.0.0-preview2.jarkafka-clients-2.2.0.jar) .conf文件,并将JARS也保存在$ SPARK_HOME / jars文件夹中。当我尝试查看来自Kafka服务器的数据的[[Key,Value(由于来自Kafka的数据以JSON格式成K-V对)时,我遇到了以下错误

java.lang.NoClassDefFoundError: org/apache/spark/kafka010/KafkaConfigUpdater at org.apache.spark.sql.kafka010.KafkaSourceProvider$.kafkaParamsForDriver(KafkaSourceProvider.scala:580) at org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaScan.toMicroBatchStream(KafkaSourceProvider.scala:466) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$1.$anonfun$applyOrElse$3(MicroBatchExecution.scala:102) at scala.collection.mutable.HashMap.getOrElseUpdate(HashMap.scala:86) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$1.applyOrElse(MicroBatchExecution.scala:95) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$1.applyOrElse(MicroBatchExecution.scala:81) at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDown$1(TreeNode.scala:286) at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:72) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:286) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$tran29) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDown(AnalysisHelper.scala:149) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDown$(AnalysisHelper.scala:147) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDown(LogicalPlan.scala:29) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDown(LogicalPlan.scala:29) at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDown$3(TreeNode.scala:291) at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$mapChildren$1(TreeNode.scala:376) at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:214) at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:374) at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:327) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:291) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$tran29) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDown(AnalysisHelper.scala:149) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDown$(AnalysisHelper.scala:147) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDown(LogicalPlan.scala:29) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDown(LogicalPlan.scala:29) at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:275) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.logicalPlan$lzycompute(MicroBatchExecution.scala:81) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.logicalPlan(MicroBatchExecution.scala:61) at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(S at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:244) Caused by: java.lang.ClassNotFoundException: org.apache.spark.kafka010.KafkaConfigUpdater at java.net.URLClassLoader.findClass(URLClassLoader.java:382) at java.lang.ClassLoader.loadClass(ClassLoader.java:419) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352) at java.lang.ClassLoader.loadClass(ClassLoader.java:352) ... 30 more Exception in thread "stream execution thread for [id = 504665ad-c59a-4a85-8c46-4d6c741b0adf, runId = 36bc5028-6b34-4d6c-a265-4c38ce66cfcbError: org/apache/spark/kafka010/KafkaConfigUpdater at org.apache.spark.sql.kafka010.KafkaSourceProvider$.kafkaParamsForDriver(KafkaSourceProvider.scala:580) at org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaScan.toMicroBatchStream(KafkaSourceProvider.scala:466) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$1.$anonfun$applyOrElse$3(MicroBatchExecution.scala:102) at scala.collection.mutable.HashMap.getOrElseUpdate(HashMap.scala:86) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$1.applyOrElse(MicroBatchExecution.scala:95) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$1.applyOrElse(MicroBatchExecution.scala:81) at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDown$1(TreeNode.scala:286) at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:72) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:286) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$tran29) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDown(AnalysisHelper.scala:149) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDown$(AnalysisHelper.scala:147) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDown(LogicalPlan.scala:29) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDown(LogicalPlan.scala:29) at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDown$3(TreeNode.scala:291) at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$mapChildren$1(TreeNode.scala:376) at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:214) at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:374) at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:327) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:291) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$tran29) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDown(AnalysisHelper.scala:149) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDown$(AnalysisHelper.scala:147) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDown(LogicalPlan.scala:29) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDown(LogicalPlan.scala:29) at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:275) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.logicalPlan$lzycompute(MicroBatchExecution.scala:81) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.logicalPlan(MicroBatchExecution.scala:61) at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(S at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:244) Caused by: java.lang.ClassNotFoundException: org.apache.spark.kafka010.KafkaConfigUpdater at java.net.URLClassLoader.findClass(URLClassLoader.java:382) at java.lang.ClassLoader.loadClass(ClassLoader.java:419) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352) at java.lang.ClassLoader.loadClass(ClassLoader.java:352) ... 30 more
这是我尝试查看Kafka数据的键-值对的代码

from pyspark import * from pyspark.sql import * from pyspark.sql.utils import * from pyspark.streaming import * from pyspark.sql.types import * from pyspark.sql.functions import * conf = SparkConf().setMaster("local") sc = SparkContext(conf = conf) sc.setLogLevel("ERROR") sqlContext = SQLContext(sc) spark = SparkSession \ .builder \ .getOrCreate() df = spark \ .readStream \ .format("kafka") \ .option("kafka.bootstrap.servers", "XX.XX.XX.XX:9092,XX.XX.XX.XX:9092") \ .option("subscribe", "topic1,topic2,topic3") \ .option("failOnDataLoss", "false") \ .load() table = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") query = table \ .writeStream \ .outputMode("append") \ .option("truncate","false") \ .format("console") \ .start() \ .awaitTermination()

有人可以帮助我解决此错误吗?预先感谢!
apache-spark pyspark apache-kafka pyspark-sql spark-structured-streaming
1个回答
0
投票
我已通过在

$ SPARK_HOME / JARS文件夹和spark-defaults]中添加spark-streaming-kafka-0-10-assembly_2.11-2.4.0.jar解决了问题.conf文件格式为>

spark.driver.extraClassPath /opt/spark-2.4.0-bin-hadoop2.7/jars/spark-sql-kafka-0-10_2.11-2.4.0.jar:/opt/spark-2.4.0-bin-hadoop2.7/jars/kafka-clients-2.2.0.jar: /opt/spark-2.4.0-bin-hadoop2.7/jars/spark-streaming-kafka-0-10-assembly_2.11-2.4.0.jar spark.executor.extraClassPath /opt/spark-2.4.0-bin-hadoop2.7/jars/spark-sql-kafka-0-10_2.11-2.4.0.jar:/opt/spark-2.4.0-bin-hadoop2.7/jars/kafka-clients-2.2.0.jar: /opt/spark-2.4.0-bin-hadoop2.7/jars/spark-streaming-kafka-0-10-assembly_2.11-2.4.0.jar

并按如下所示运行spark-submit命令:

$SPARK_HOME/bin/spark-submit --master yarn --jars $SPARK_HOME/jars/spark-sql-kafka-0-10_2.12-3.0.0-preview.jar,$SPARK_HOME/jars/kafka-clients-2.2.0.jar,$SPARK_HOME/jars/spark-streaming-kafka-0-10-assembly_2.12-3.0.0-preview2.jar test.py

© www.soinside.com 2019 - 2024. All rights reserved.