无法在Jupyter笔记本上使用kafka jars

问题描述 投票:0回答:1

我正在使用 Spark 结构化流从单节点 Kafka 读取数据。在 Mac 上本地运行以下设置。我可以通过 Spark-Submit 阅读,但在 Jupyter Notebook 中不起作用。

from pyspark.sql import SparkSession
from pyspark.sql.functions import col,from_json
from pyspark.sql.types import  StructType, StructField, StringType, LongType, DoubleType, IntegerType, ArrayType

import time
spark.stop()
time.sleep(30)

spark = SparkSession.builder\
    .appName('KafkaIntegration6')\
    .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.4")\
    .getOrCreate()
print('done')

kafka_stream_df = spark\
    .readStream\
    .format("kafka")\
    .option("kafka.bootstrap.servers", "localhost:9092")\
    .option("subscribe", "TestTopic2")\
    .option("startingOffsets", "earliest")\
    .load()

错误:分析异常:找不到数据源:kafka。请按照结构化流+ Kafka集成指南的部署部分部署应用程序。

问题:我能够运行相同的代码,可以从 Kafka 读取数据,使用 Spark 提交,通过将 Spark-sql-kafka-0-10_2.12:3.4 作为 --packages 传递。但是,当我尝试通过在会话创建期间导入 Kafka 包从 Jupyter 笔记本运行时,出现此错误。当在http://localhost:4040/environment/上查看spark web UI时,我可以看到spark.jars.packages下提到的包org.apache.spark:spark-sql-kafka-0-10_2.12:3.4

python apache-spark apache-kafka spark-structured-streaming
1个回答
0
投票

这个替代方法似乎仍然有效

import pyspark
from pyspark.sql import SparkSession
import os

scala_version = '2.12'  # TODO: Ensure this is correct
spark_version = pyspark.__version__

packages = [
    f'org.apache.spark:spark-sql-kafka-0-10_{scala_version}:{spark_version}',
    'org.apache.kafka:kafka-clients:3.5.1'
]

args = os.environ.get('PYSPARK_SUBMIT_ARGS', '')
if not args:
    args = f'--packages {",".join(packages)}'
    print('Using packages', packages)
    os.environ['PYSPARK_SUBMIT_ARGS'] = f'{args} pyspark-shell'

spark = SparkSession.builder\
   .master("local")\
   .appName("kafka-example")\
   .getOrCreate()

在日志中,我得到

Using packages ['org.apache.spark:spark-sql-kafka-0-10_2.12:3.4.1', 'org.apache.kafka:kafka-clients:3.5.1']
:: loading settings :: url = jar:file:/private/tmp/venv/lib/python3.11/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml
Ivy Default Cache set to: /Users/jomoore/.ivy2/cache
The jars for the packages stored in: /Users/jomoore/.ivy2/jars
org.apache.spark#spark-sql-kafka-0-10_2.12 added as a dependency
org.apache.kafka#kafka-clients added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-27575234-cd3f-4e23-9c25-4ea0f1309b35;1.0
    confs: [default]
    found org.apache.spark#spark-sql-kafka-0-10_2.12;3.4.1 in central
    found org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.4.1 in central
    found org.apache.hadoop#hadoop-client-runtime;3.3.4 in central
    found org.apache.hadoop#hadoop-client-api;3.3.4 in central
    found org.xerial.snappy#snappy-java;1.1.10.1 in local-m2-cache
    found org.slf4j#slf4j-api;2.0.6 in central
    found commons-logging#commons-logging;1.1.3 in local-m2-cache
    found com.google.code.findbugs#jsr305;3.0.0 in local-m2-cache
    found org.apache.commons#commons-pool2;2.11.1 in local-m2-cache
    found org.apache.kafka#kafka-clients;3.5.1 in local-m2-cache
    found com.github.luben#zstd-jni;1.5.5-1 in local-m2-cache
    found org.lz4#lz4-java;1.8.0 in local-m2-cache
downloading https://repo1.maven.org/maven2/org/apache/spark/spark-sql-kafka-0-10_2.12/3.4.1/spark-sql-kafka-0-10_2.12-3.4.1.jar ...
    [SUCCESSFUL ] org.apache.spark#spark-sql-kafka-0-10_2.12;3.4.1!spark-sql-kafka-0-10_2.12.jar (195ms)
downloading file:/Users/jomoore/.m2/repository/org/apache/kafka/kafka-clients/3.5.1/kafka-clients-3.5.1.jar ...
    [SUCCESSFUL ] org.apache.kafka#kafka-clients;3.5.1!kafka-clients.jar (11ms)
downloading https://repo1.maven.org/maven2/org/apache/spark/spark-token-provider-kafka-0-10_2.12/3.4.1/spark-token-provider-kafka-0-10_2.12-3.4.1.jar ...
    [SUCCESSFUL ] org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.4.1!spark-token-provider-kafka-0-10_2.12.jar (117ms)
© www.soinside.com 2019 - 2024. All rights reserved.