我想将这个简单的 pyspark 数据帧发送到 kafka,我做了所有事情,但总是收到错误。我用简单的 python 生产者脚本尝试它,它可以工作,并且 pyspark 读取流可以工作,我唯一的问题是使用 pyspark 写入 kafka。谁能帮我吗?
from pyspark.sql import SparkSession
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-streaming-kafka-0-10_2.12:3.2.0,org.apache.spark:spark-sql-kafka-0-10_2.12:3.2.0 pyspark-shell'
# Create a SparkSession
spark = SparkSession.builder \
.appName("KafkaSinkExample") \
.getOrCreate()
# Sample data
data = [("key1", "value1"), ("key2", "value2"), ("key3", "value3")]
df = spark.createDataFrame(data, ["key", "value"])
# Write data to Kafka
(df.write \
.format("kafka") \
.option("kafka.bootstrap.servers", "broker:29092") \
.option("topic", "test") \
.save())
这是我的 docker-compose 文件:
services:
zookeeper:
image: confluentinc/cp-zookeeper:7.5.0
hostname: zookeeper
container_name: zookeeper
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
broker:
image: confluentinc/cp-server:7.5.0
hostname: broker
container_name: broker
depends_on:
- zookeeper
ports:
- "9092:9092"
- "9101:9101"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092
KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_CONFLUENT_LICENSE_TOPIC_REPLICATION_FACTOR: 1
KAFKA_CONFLUENT_BALANCER_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_JMX_PORT: 9101
KAFKA_JMX_HOSTNAME: localhost
KAFKA_CONFLUENT_SCHEMA_REGISTRY_URL: http://schema-registry:8081
CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: broker:29092
CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1
CONFLUENT_METRICS_ENABLE: 'true'
CONFLUENT_SUPPORT_CUSTOMER_ID: 'anonymous'
pyspark:
image: jupyter/pyspark-notebook:latest
container_name: pyspark
ports:
- "8888:8888"
environment:
- PYSPARK_PYTHON=python3
- PYSPARK_DRIVER_PYTHON=python3
- SPARK_HOME=/usr/local/spark
volumes:
- ./notebooks:/home/jovyan/work # Mount your notebooks folder or adjust as needed
我得到的错误:
23/12/25 16:50:56 ERROR TaskSetManager: Task 1 in stage 0.0 failed 1 times; aborting job
Traceback (most recent call last): (0 + 1) / 2]
File "/home/jovyan/preprocessing/bing.py", line 23, in <module>
.save())
^^^^^^
File "/usr/local/spark/python/pyspark/sql/readwriter.py", line 1461, in save
self._jwrite.save()
File "/usr/local/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py", line 1322, in __call__
File "/usr/local/spark/python/pyspark/errors/exceptions/captured.py", line 179, in deco
return f(*a, **kw)
^^^^^^^^^^^
File "/usr/local/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/protocol.py", line 326, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o49.save.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 0.0 failed 1 times, most recent failure: Lost task 1.0 in stage 0.0 (TID 1) (0f08baeba383 executor driver): java.lang.NoSuchMethodError: 'boolean org.apache.spark.sql.catalyst.expressions.Cast$.apply$default$4()'
如果您使用最新的 pyspark 版本 3.5.0,我也遇到了同样的问题,您应该将包编辑为
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-streaming-kafka-0-10_2.12:3.5.0,org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0 pyspark-shell'