Spark Shuffle Read 和 Shuffle Write 在结构化尖叫中增加

问题描述 投票:0回答:0

在过去的 23 小时里,我一直在使用 Kafka 运行 spark-structured streaming。我可以看到 Shuffle Read 和 Shuffle Write 急剧增加,最后,驱动程序因“内存不足”而停止。

数据推送到 Kafak 是每秒 3 json 和 Spark streaming processingTime='30 seconds'

spark = SparkSession \
    .builder \
    .master("spark://spark-master:7077") \
    .appName("demo") \
    .config("spark.executor.cores", 1) \
    .config("spark.cores.max", "4") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .config("spark.sql.warehouse.dir", "hdfs://172.30.7.36:9000/user/hive/warehouse") \
    .config("spark.streaming.stopGracefullyOnShutdown", "true") \
    .config("spark.executor.memory", '1g') \
    .config("spark.scheduler.mode", "FAIR") \
    .config("spark.driver.memory", '2g') \
    .config("spark.sql.caseSensitive", "true") \
    .config("spark.sql.shuffle.partitions", 8) \
    .enableHiveSupport() \
    .getOrCreate()
    CustDf \
        .writeStream \
        .queryName("customerdatatest") \
        .format("delta") \
        .outputMode("append") \
        .trigger(processingTime='30 seconds') \
        .option("mergeSchema", "true") \
        .option("checkpointLocation", "/checkpoint/bronze_customer/") \
        .toTable("bronze.customer")

我期待这个 straming 应该至少连续运行 1 个月。

请帮我解决这个问题。天气我错过了任何配置?

spark-structured-streaming
© www.soinside.com 2019 - 2024. All rights reserved.