Kafka流式处理-默认情况下,禁止备份状态存储变更日志主题没有保留期

问题描述 投票:0回答:1

当前正在使用DSL kafka流抑制功能来存储中间聚合结果。

观察到,默认情况下,用于抑制功能的更改日志主题(summary-generator-KTABLE-SUPPRESS-STATE-STORE-0000000004-changelog)下没有保留期,即使窗口已经提前,它也会发出较旧的记录。如果节点由于内存问题而崩溃并再次重新启动,则会发生这种情况。

几天前将应用程序投入生产,并观察到以下问题。

1)状态存储-抑制所支持的changelog主题正在快速增长,因为未删除较旧的记录。

2)有时,由于内存不足,聚合服务在某些节点上崩溃,导致将状态复制到其他服务器,这给那些节点增加了更多的处理压力,这也导致了这些节点的崩溃。

您能否在下面帮助我理解。

1)如何为内部创建的状态存储变更日志主题添加保留期?如果不可行,那么可以由代理配置来驱动吗?

2)万一任何节点上的聚合过程崩溃,该节点上的Task都会与重建状态存储一起重新定位到另一个节点,所有状态存储是否与另一个节点的内存不同步,并且应用程序引用它进行进一步聚合?如果是,则回答崩溃的第二节点聚合过程。

或仅在应用程序请求时才将特定的中间结果与内存同步?

3)由于内存不足而需要重置状态,因此怀疑状态存储已损坏。

最近向所有节点添加了更多内存。谢谢您的帮助。

代码-

KTable,JsonNode>聚合=交易.groupByKey().windowedBy(TimeWindows.of(Duration.ofSeconds(windowDuration))。grace(Duration.ofSeconds(windowGraceDuration))).aggregate(()-> new AggregationService()。buildInitialStats(),(键,交易,previousStats)->新的AggregationService()。buildProfileStatistics(键,交易,previousStats,runByUnit),实体化。> as(statStoreName).withRetention(Duration.ofSeconds((windowDuration + windowGraceDuration + windowRetentionDuration)))).withKeySerde(Serdes.String()).withValueSerde(jsonSerde)).suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()));

apache-kafka-streams
1个回答
0
投票

我正在使用kafka流2.1.1版本,并发现了与抑制相关的错误,该错误后来解决到2.2.1和更高版本中。

https://issues.apache.org/jira/plugins/servlet/mobile#issue/KAFKA-7895

请让我知道2.3.o是否可以解决此问题?

观察到的模拟器问题主要是在重新启动节点后抑制多次发出属于较旧窗口的记录,并且考虑到大容量(较旧和当前的窗口记录),与内存不足问题有关]

© www.soinside.com 2019 - 2024. All rights reserved.