Spark 结构化流 - 检查点元数据无限增长

问题描述 投票:0回答:2

我使用spark结构流3.1.2。我需要使用 s3 来存储检查点元数据(我知道,这不是检查点元数据的最佳存储)。压缩间隔是10(默认),我设置了

spark.sql.streaming.minBatchesToRetain=5
。当作业运行几周后,检查点时间显着增加(导致处理延迟几分钟)。我查看了检查点元数据结构。那里有一条沉重的路:
checkpoint/source/0
。单个 .compact 文件重 25GB。我查看了它的内容,它包含自批次 0 以来的所有条目(当前批次约为 25000)。

我尝试了一些参数来从紧凑文件中删除已处理的数据,即:

spark.cleaner.referenceTracking.cleanCheckpoints=true
- 不起作用。正如我在代码中看到的,它与以前版本的流媒体相关,不是吗?
spark.sql.streaming.fileSource.log.deletion=true
spark.sql.streaming.fileSink.log.deletion=true
不起作用。

即使所有数据都已处理(最近的检查点除外),紧凑文件也会存储完整的历史记录,因此我预计大多数条目都会被删除。是否有任何参数可以从紧凑文件中删除条目或不时优雅地删除紧凑文件?

现在我正在测试场景,当我停止作业时,删除大部分

checkpoint/source/0/*
文件,仅保留一些最近的检查点(未压缩),然后重新运行作业。作业从最近的检查点正确恢复。当涉及检查点压缩时,它会失败并丢失最近的压缩文件。我可能需要编辑最近的紧凑文件(而不是删除它)并只保留一些最近的记录。它看起来像是我的问题的可能解决方法,但是这种手动删除检查点文件的场景看起来很难看,所以我更喜欢由 Spark 管理的东西。

apache-spark spark-streaming spark-structured-streaming spark-checkpoint
2个回答
3
投票

对于后代:问题是

FileStreamSourceLog
类。我需要覆盖方法
shouldRetain
,默认情况下返回 true,其文档说:

默认实现保留所有日志条目。实现应该重写该方法来改变行为。


0
投票

请参阅 databricks 文档。主要是 .option('retention',retention) 解决了同样的问题。

© www.soinside.com 2019 - 2024. All rights reserved.