在过去的 23 小时里,我一直在使用 Kafka 运行 spark-structured streaming。我可以看到 Shuffle Read 和 Shuffle Write 急剧增加,最后,驱动程序因“内存不足”而停止。
数据推送到 Kafak 是每秒 3 json 和 Spark streaming processingTime='30 seconds'
spark = SparkSession \
.builder \
.master("spark://spark-master:7077") \
.appName("demo") \
.config("spark.executor.cores", 1) \
.config("spark.cores.max", "4") \
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
.config("spark.sql.warehouse.dir", "hdfs://172.30.7.36:9000/user/hive/warehouse") \
.config("spark.streaming.stopGracefullyOnShutdown", "true") \
.config("spark.executor.memory", '1g') \
.config("spark.scheduler.mode", "FAIR") \
.config("spark.driver.memory", '2g') \
.config("spark.sql.caseSensitive", "true") \
.config("spark.sql.shuffle.partitions", 8) \
.enableHiveSupport() \
.getOrCreate()
CustDf \
.writeStream \
.queryName("customerdatatest") \
.format("delta") \
.outputMode("append") \
.trigger(processingTime='30 seconds') \
.option("mergeSchema", "true") \
.option("checkpointLocation", "/checkpoint/bronze_customer/") \
.toTable("bronze.customer")
我期待这个 straming 应该至少连续运行 1 个月。
请帮我解决这个问题。天气我错过了任何配置?