Spark 结构化流因 Kubernetes 中执行器内存不足而失败

问题描述 投票:0回答:1

我正在运行一个 Spark 结构化流作业,从对象存储桶中读取数据,进行一些转换和过滤,调用 groupby 和聚合。以 csv 格式将该写入内容发布到对象存储中。

maxFilesPerTrigger = 200
lookback_window = "24 hours"
sliding_interval = "1 hour"
watermark = "1 hour"
output_mode = "update"

df = spark\
    .readStream\
    .format(file_type)\
    .schema(schema)\
    .option("latestFirst", latest_first)\
    .option("maxFileAge", maxFileAge)\
    .option("header", header)\
    .option("timestampFormat", timestampFormat)\
    .option("maxFilesPerTrigger", maxFilesPerTrigger)\
    .load(f"{path}/*.{file_type}")

df = df.filter()
df = df.withcolumn()  .. 20 odd features
df = df.withWatermark()
df= df.groupby(x).agg() 22 features in aggregation.


def foreach_function(agg_df, batch_no):
    global write_minio_path

    t0 = time.time()
    logger.info(f"Writing batch -{batch_no} to {write_minio_path}")
    agg_df.write.mode("append").parquet(write_minio_path)
    logger.info(f"Batch - {batch_no} Time taken  ::{time.time() - t0}")

logger.info("Calling Foreach Batch Function")
query = df.writeStream.foreachBatch(foreach_function).option("checkpointLocation", checkpoint_path).outputMode(output_mode).start().awaitTermination()

**上面的代码在 3 个执行器、30 GB 内存和 5 GB 内存开销的情况下运行内存不足。驱动程序有 10 GB 内存,开销为 1 GB。

23/09/20 14:09:43 ERROR TaskSchedulerImpl: Lost executor 4 on IP: The executor with id 4 exited with exit code 52(JVM OOM).



The API gave the following container statuses:


         container name: spark-kubernetes-executor
         container image: spark:3.3.0
         container state: terminated
         container started at: 2023-09-20T14:01:46Z
         container finished at: 2023-09-20T14:09:41Z
         exit code: 52
         termination reason: Error

23/09/20 14:09:49 ERROR FileFormatWriter: Aborting job 757c5b44-f7c9-436a-874b-295370b1ab97. org.apache.spark.SparkException: Job aborted due to stage failure: ShuffleMapStage 87 (start at NativeMethodAccessorImpl.java:0) has failed the maximum allowable number of times: 4. Most recent failure reason: org.apache.spark.shuffle.FetchFailedException

我们使用的是aws-sdk-java-1.11.0和hadoop-aws-3.3.0。我们有没有 这些 jar 中的线程泄漏可能导致 OOM ?

我们检查了执行器的堆转储并得到了最大的状态 内存中的对象。如何计算所需的状态大小 存储在执行器内存中?

请帮我调试这个。**

apache-spark amazon-s3 pyspark spark-structured-streaming minio
1个回答
0
投票

嗯,根据我的经验

  1. maxFilesPerTrigger
    maxBytesPerTrigger
  2. 更难预测
  3. 即使使用
    maxFilesPerTrigger
    ,200 个文件对于 3 个执行者来说可能也太多了。我的目标是近似微批量大小(基于文件或字节)小于
    sum(executor_memory)/2
    的一半。在你的情况下应该是 <= 45G. If 200 files exceed that number, then problems are likely.

不相关的注释 - 你为什么使用

foreachBatch

在你的情况下,它可以简化为类似


 df.writeStream.start(
   path=write_minio_path, 
   outputMode="append", 
   format="parquet",
   checkpointLocation=checkpoint_path
 ).awaitTermination()

最新问题
© www.soinside.com 2019 - 2024. All rights reserved.