Kafka 流式传输到 Spark

问题描述 投票:0回答:1

我想使用 Kafka 流式传输 Twitter 数据,并使用 Spark 进行情绪分析。生产者运行良好,它可以从 Twitter API 检索数据到 Kafka 主题,但我作为消费者在 Spark 中遇到错误。

以下是 Spark Session 的代码,其中包含文档中引用的包。

 # Config
    spark = SparkSession \
        .builder \
        .master("local[*]") \
        .appName("TwitterSentimentAnalysis") \
        .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.4.0") \
        .getOrCreate()

但是当我尝试从 Kafka 主题中读取数据时

df = spark \
        .readStream \
        .format("kafka") \
        .option("kafka.bootstrap.servers", "localhost:9092") \
        .option("subscribe", "twitter") \
        .option("startingOffsets", "latest") \
        .option("header", "true") \
        .load() \
        .selectExpr("CAST(value AS STRING) as message")

它显示这样的错误

引发 Py4JJavaError( py4j.protocol.Py4JJavaError:调用 o36.load 时出错。 java.lang.NoSuchMethodError: 'scala.collection.mutable.WrappedArray scala.Predef$.wrapRefArray(java.lang.Object[])'

我已经切换到另一个版本,Kafka、Spark 和 PySpark,但仍然出现错误。尝试将 Docker 用于 Spark 和 Kafka,但仍然出现相同的错误。

apache-spark pyspark apache-kafka spark-structured-streaming
1个回答
0
投票

spark-sql-kafka-*及其依赖可以使用--packages直接添加到spark-submit中,如

./bin/spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.4.0 ...

也有可能你有spark和kafka之间的版本兼容性问题。确保您使用的是好的版本。

© www.soinside.com 2019 - 2024. All rights reserved.