我正在运行一个 Spark 结构化流作业,从对象存储桶中读取数据,进行一些转换和过滤,调用 groupby 和聚合。以 csv 格式将该写入内容发布到对象存储中。
maxFilesPerTrigger = 200
lookback_window = "24 hours"
sliding_interval = "1 hour"
watermark = "1 hour"
output_mode = "update"
df = spark\
.readStream\
.format(file_type)\
.schema(schema)\
.option("latestFirst", latest_first)\
.option("maxFileAge", maxFileAge)\
.option("header", header)\
.option("timestampFormat", timestampFormat)\
.option("maxFilesPerTrigger", maxFilesPerTrigger)\
.load(f"{path}/*.{file_type}")
df = df.filter()
df = df.withcolumn() .. 20 odd features
df = df.withWatermark()
df= df.groupby(x).agg() 22 features in aggregation.
def foreach_function(agg_df, batch_no):
global write_minio_path
t0 = time.time()
logger.info(f"Writing batch -{batch_no} to {write_minio_path}")
agg_df.write.mode("append").parquet(write_minio_path)
logger.info(f"Batch - {batch_no} Time taken ::{time.time() - t0}")
logger.info("Calling Foreach Batch Function")
query = df.writeStream.foreachBatch(foreach_function).option("checkpointLocation", checkpoint_path).outputMode(output_mode).start().awaitTermination()
**上面的代码在 3 个执行器、30 GB 内存和 5 GB 内存开销的情况下运行内存不足。驱动程序有 10 GB 内存,开销为 1 GB。
23/09/20 14:09:43 ERROR TaskSchedulerImpl: Lost executor 4 on IP: The executor with id 4 exited with exit code 52(JVM OOM).
The API gave the following container statuses:
container name: spark-kubernetes-executor
container image: spark:3.3.0
container state: terminated
container started at: 2023-09-20T14:01:46Z
container finished at: 2023-09-20T14:09:41Z
exit code: 52
termination reason: Error
23/09/20 14:09:49 ERROR FileFormatWriter: Aborting job 757c5b44-f7c9-436a-874b-295370b1ab97. org.apache.spark.SparkException: Job aborted due to stage failure: ShuffleMapStage 87 (start at NativeMethodAccessorImpl.java:0) has failed the maximum allowable number of times: 4. Most recent failure reason: org.apache.spark.shuffle.FetchFailedException
我们使用的是aws-sdk-java-1.11.0和hadoop-aws-3.3.0。我们有没有 这些 jar 中的线程泄漏可能导致 OOM ?
我们检查了执行器的堆转储并得到了最大的状态 内存中的对象。如何计算所需的状态大小 存储在执行器内存中?
请帮我调试这个。**
嗯,根据我的经验
maxFilesPerTrigger
比 maxBytesPerTrigger
maxFilesPerTrigger
,200 个文件对于 3 个执行者来说可能也太多了。我的目标是近似微批量大小(基于文件或字节)小于 sum(executor_memory)/2
的一半。在你的情况下应该是 <= 45G. If 200 files exceed that number, then problems are likely.不相关的注释 - 你为什么使用
foreachBatch
?
在你的情况下,它可以简化为类似
df.writeStream.start(
path=write_minio_path,
outputMode="append",
format="parquet",
checkpointLocation=checkpoint_path
).awaitTermination()