pyspark 流式写入 kafka 不起作用

问题描述 投票:0回答:1

我想将这个简单的 pyspark 数据帧发送到 kafka,我做了所有事情,但总是收到错误。我用简单的 python 生产者脚本尝试它,它可以工作,并且 pyspark 读取流可以工作,我唯一的问题是使用 pyspark 写入 kafka。谁能帮我吗?

from pyspark.sql import SparkSession
import os 

os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-streaming-kafka-0-10_2.12:3.2.0,org.apache.spark:spark-sql-kafka-0-10_2.12:3.2.0 pyspark-shell'

# Create a SparkSession
spark = SparkSession.builder \
    .appName("KafkaSinkExample") \
    .getOrCreate()

# Sample data
data = [("key1", "value1"), ("key2", "value2"), ("key3", "value3")]
df = spark.createDataFrame(data, ["key", "value"])

# Write data to Kafka
(df.write \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "broker:29092") \
    .option("topic", "test") \
    .save())

这是我的 docker-compose 文件:

services:
  zookeeper:
    image: confluentinc/cp-zookeeper:7.5.0
    hostname: zookeeper
    container_name: zookeeper
    ports:
      - "2181:2181"
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000

  broker:
    image: confluentinc/cp-server:7.5.0
    hostname: broker
    container_name: broker
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
      - "9101:9101"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092
      KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
      KAFKA_CONFLUENT_LICENSE_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_CONFLUENT_BALANCER_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
      KAFKA_JMX_PORT: 9101
      KAFKA_JMX_HOSTNAME: localhost
      KAFKA_CONFLUENT_SCHEMA_REGISTRY_URL: http://schema-registry:8081
      CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: broker:29092
      CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1
      CONFLUENT_METRICS_ENABLE: 'true'
      CONFLUENT_SUPPORT_CUSTOMER_ID: 'anonymous'

  pyspark:
    image: jupyter/pyspark-notebook:latest
    container_name: pyspark
    ports:
      - "8888:8888"
    environment:
      - PYSPARK_PYTHON=python3
      - PYSPARK_DRIVER_PYTHON=python3
      - SPARK_HOME=/usr/local/spark
    volumes:
      - ./notebooks:/home/jovyan/work # Mount your notebooks folder or adjust as needed


我得到的错误:

23/12/25 16:50:56 ERROR TaskSetManager: Task 1 in stage 0.0 failed 1 times; aborting job
Traceback (most recent call last):                                                                                                                                                                           (0 + 1) / 2]
  File "/home/jovyan/preprocessing/bing.py", line 23, in <module>
    .save())
     ^^^^^^
  File "/usr/local/spark/python/pyspark/sql/readwriter.py", line 1461, in save
    self._jwrite.save()
  File "/usr/local/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py", line 1322, in __call__
  File "/usr/local/spark/python/pyspark/errors/exceptions/captured.py", line 179, in deco
    return f(*a, **kw)
           ^^^^^^^^^^^
  File "/usr/local/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/protocol.py", line 326, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o49.save.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 0.0 failed 1 times, most recent failure: Lost task 1.0 in stage 0.0 (TID 1) (0f08baeba383 executor driver): java.lang.NoSuchMethodError: 'boolean org.apache.spark.sql.catalyst.expressions.Cast$.apply$default$4()'
docker pyspark apache-kafka jupyter-notebook spark-streaming
1个回答
0
投票

如果您使用最新的 pyspark 版本 3.5.0,我也遇到了同样的问题,您应该将包编辑为

os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-streaming-kafka-0-10_2.12:3.5.0,org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0 pyspark-shell'
© www.soinside.com 2019 - 2024. All rights reserved.