我想使用 Kafka 流式传输 Twitter 数据,并使用 Spark 进行情绪分析。生产者运行良好,它可以从 Twitter API 检索数据到 Kafka 主题,但我作为消费者在 Spark 中遇到错误。
以下是 Spark Session 的代码,其中包含文档中引用的包。
# Config
spark = SparkSession \
.builder \
.master("local[*]") \
.appName("TwitterSentimentAnalysis") \
.config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.4.0") \
.getOrCreate()
但是当我尝试从 Kafka 主题中读取数据时
df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "twitter") \
.option("startingOffsets", "latest") \
.option("header", "true") \
.load() \
.selectExpr("CAST(value AS STRING) as message")
它显示这样的错误
引发 Py4JJavaError( py4j.protocol.Py4JJavaError:调用 o36.load 时出错。 java.lang.NoSuchMethodError: 'scala.collection.mutable.WrappedArray scala.Predef$.wrapRefArray(java.lang.Object[])'
我已经切换到另一个版本,Kafka、Spark 和 PySpark,但仍然出现错误。尝试将 Docker 用于 Spark 和 Kafka,但仍然出现相同的错误。
spark-sql-kafka-*及其依赖可以使用--packages直接添加到spark-submit中,如
./bin/spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.4.0 ...
也有可能你有spark和kafka之间的版本兼容性问题。确保您使用的是好的版本。