我在这里陷入困境。我正在尝试实现一个非常基本的管道,它从 kafka 读取数据并在 Spark 中处理它。我面临的问题是 apache Spark 突然关闭并给出上述错误消息。我的pyspark版本是3.5.1,scala版本是2.12.18。
有问题的代码是:-
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
spark = SparkSession.builder \
.appName('my_app') \
.config("spark.jars", "/usr/local/spark/jars/spark-sql-kafka-0-10_2.12-3.5.1.jar") \
.getOrCreate()
df = spark.readStream \
.format('kafka') \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "quickstart-events") \
.option("startingOffsets", "earliest") \
.load()
query = df.writeStream \
.trigger(processingTime='5 seconds') \
.outputMode("update") \
.format("console") \
.start()
query.awaitTermination()
我已经下载了所有必需的 jar 文件并将它们放在 Spark 中的相应目录中。我能够以客户身份读取来自 kafka 代理的消息,因此排除了我的 kafka 安装出现故障的可能性。任何帮助将不胜感激。
NoSuchMethodError 的一般原因是
Java中NoSuchMethodException和NoSuchMethodError的区别
我怀疑你正在混合
_2.12
和 _2.13
依赖项。
类
scala.collection.JavaConverters.AsJava
存在于 Scala 2.13.x 中,但不存在于 Scala 2.12.x 中。
您应该调查您的类路径。修改你的脚本
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
spark = SparkSession.builder \
.appName('my_app') \
.getOrCreate()
print('Class path on start')
gateway = SparkContext._gateway
separator = gateway.jvm.java.lang.System.getProperty('path.separator')
for url in gateway.jvm.java.lang.System.getProperty('java.class.path').split(separator):
print(url)
print()
# classLoader = gateway.jvm.java.lang.ClassLoader.getSystemClassLoader()
# classLoader = gateway.java_gateway_server.getClass().getClassLoader()
classLoader = gateway.jvm.java.lang.Thread.currentThread().getContextClassLoader()
while classLoader is not None:
try:
urls = classLoader.getURLs()
print(f'Class loader {classLoader}:')
for url in urls:
print(url)
except:
print(f'Class loader {classLoader}: not URLClassLoader')
print()
classLoader = classLoader.getParent()
print("ok")
df = spark.readStream \
.format('kafka') \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "quickstart-events") \
.option("startingOffsets", "earliest") \
.load()
query = df.writeStream \
.trigger(processingTime='5 seconds') \
.outputMode("update") \
.format("console") \
.start()
query.awaitTermination()
并运行它
./spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.1 /path/to/file.py
查看输出(您可以在此处发布)是否不仅有
_2.12
jar,而且还有 _2.13
jar。
使用
spark-submit
,.config("spark.jars", ...
在代码中不起作用,正确的是命令行中的 --packages ...
: