来自 kafka 的 Spark 流

问题描述 投票:0回答:1

我正在尝试使用 Spark 从 Kafka 流式传输数据,我的代码如下:

from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession 
from pyspark.sql.functions import *
from pyspark.sql.types import *

spark = SparkSession.builder.master("yarn")\
    .getOrCreate()

df = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "11.11.11.11:9092") \
    .option("subscribe", "topic-1") \
    .option("kafka.security.protocol", "SASL_SSL") \
    .option("kafka.sasl.mechanism", "SCRAM-SHA-512") \
    .option("kafka.sasl.jaas.config", 'org.apache.kafka.common.security.scram.ScramLoginModule required username="bdsm" password="C28776666w";') \
    .option("kafka.ssl.truststore.location", "/usr/local/hadoop/spark-3.3.2-bin-hadoop3/kafka_broker_topic.trustst.jks") \
    .option("kafka.ssl.truststore.password", "storepass") \
    .option("kafka.ssl.keystore.location", "/usr/local/hadoop/spark-3.3.2-bin-hadoop3/kafka_broker_topic.keyst.jks") \
    .option("kafka.ssl.keystore.password", "storepass") \
    .option("kafka.ssl.key.password", "storepass") \
    .option("kafka.session.timeout.ms", "6000") \
    .option("startingOffsets", "latest") \
    .load()

data = df.selectExpr("CAST(value AS STRING) as json") \
    .select(from_json("json", schema).alias("data")) \
    .select("data.*")

query = data \
    .writeStream \
    .format("parquet") \
    .option("path", "hdfs://10.10.10.10:8020/user/spark/logs_spark") \
    .option("checkpointLocation", "hdfs://10.10.10.10:8020/user/spark/checkpoint/dir") \
    .trigger(processingTime='2 minutes') \
    .start()

query.awaitTermination()

但是,我遇到以下问题:

23/06/16 08:42:46 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
23/06/16 08:42:46 ERROR MicroBatchExecution: Query [id = c8b77688-eb2e-42b6-9b1d-0f091bc5ded3, runId = 93ab5673-96de-4aee-bb5e-fe26a20f9c83] terminated with error
java.lang.NoClassDefFoundError: org/apache/spark/kafka010/KafkaConfigUpdater
    at org.apache.spark.sql.kafka010.KafkaSourceProvider$.kafkaParamsForDriver(KafkaSourceProvider.scala:645)
    at org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaScan.toMicroBatchStream(KafkaSourceProvider.scala:482)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$1.$anonfun$applyOrElse$4(MicroBatchExecution.scala:104)

这可能是什么原因造成的?作为参考,我的开发环境包括以下内容:

spark 3.3.2
scala 2.12
spark-streaming-kafka-0-10-assembly_2.12-3.3.2.jar
spark-streaming-kafka-0-10_2.12-3.3.2.jar
spark-token-provider-kafka-0-10_2.12-3.3.2.jar
spark-sql-kafka-0-10_2.12-3.3.2.jar
kafka-clients-3.3.2.jar
pyspark apache-spark-sql spark-streaming maven-plugin
1个回答
0
投票

找到了解决方案,问题是在hadoop/lib中我有jar:(kafka-clients,spark-token-provider-kafka,spark-sql-kafka),因为spark-sql-kafka包含:- -->(kafka-clients,spark-token-provider-kafka)不需要单独安装

© www.soinside.com 2019 - 2024. All rights reserved.