Kafka Streams中的重建状态存储是否会将重复的记录传播到下游主题?

问题描述 投票:0回答:2

我目前正在将Kafka Streams用于有状态的应用程序。该状态虽然not存储在Kafka状态存储中,但目前仅存储在内存中。这意味着每当我重新启动应用程序时,所有状态都会丢失,必须通过从头开始处理所有记录来重建它。

在对Kafka状态存储进行了一些研究之后,这似乎正是我正在寻找的在应用程序重新启动之间(在内存或磁盘中)保持状态的解决方案。但是,我发现在线资源缺少一些非常重要的细节,因此我仍然对如何使用这些资源有一些疑问:

  • 如果流设置为从偏移量latest开始,是否仍将根据所有先前的记录来计算状态?
  • 如果需要重新处理以前已经处理过的记录以重建状态,这将通过其余Streams拓扑传播记录(例如InputTopic->有状态处理器-> OutputTopic,这将导致OutputTopic中的记录重复,因为重建状态)?
apache-kafka apache-kafka-streams restore stateful
2个回答
2
投票

状态存储使用自己的changelog主题,而kafka-streams状态存储负责从中进行加载。如果您的状态存储未初始化,则您的kafka-streams应用将使用EARLIEST从changelog主题重新为其本地状态存储补水,因为它必须读取每条记录。

这意味着一个全新实例的启动顺序大致为:

  • 观察没有本地状态存储缓存
  • 通过使用来自状态库的更改日志主题来加载本地状态存储(状态存储的主题名称为<state-store-name>-changelog
  • 读取每个记录并相应地更新本地rocksDB实例
  • 不发出任何东西,因为这是应用程序服务,而不是您的实际拓扑
  • 根据配置拓扑的方式,使用EARLIESTLATEST读取消费组偏移。如果您的消费群体还没有任何补偿,这不是唯一的问题
  • 处理内容,根据拓扑发出记录

是否将实际拓扑的auto.offset.reset设置为LATESTEARLIEST取决于您。如果它们丢失了,或者您创建了一个新组,则在可能跳过的记录(LATEST)与处理旧记录的重复处理与重复数据删除(EARLIEST)之间保持平衡。

长话短说:状态恢复与处理不同,由kafka-streams对其自身进行处理。


0
投票

如果流设置为从偏移量latest开始,是否仍将根据所有先前的记录来计算状态?

如果您要重新启动同一应用程序(例如,在之前停止运行之后),则通过重新处理原始输入数据来重新计算状态[[不会]]。相反,状态将从其“备份”中恢复(每个状态存储或KTable都持久存储在Kafka主题中,为此目的该表/状态存储的所谓“ changelog主题”),以便其数据准确无误。停止应用程序时的状态。这种行为使您可以无缝地停止+重新启动应用程序,而不会跳过在“停止”和“重新启动”之间到达的记录。但是您需要注意一个不同的警告:设置偏移起始点(latestearliest)的配置仅在您运行Kafka Streams应用程序时使用[。之后,每当您停止+重新启动应用程序时,它将始终在先前停止的位置继续。这是因为,如果该应用程序至少运行了一次,则它已将其消费者补偿信息存储在Kafka中,这使它可以知道重新启动后从何处自动恢复操作。

如果您需要始终(重新)从头开始的其他行为,例如latest偏移量(因此可能会跳过在停止应用程序与重新启动应用程序之间到达的记录),因此必须为reset your Kafka Streams application。重置工具执行的步骤之一是从Kafka删除应用程序的使用者偏移信息,这使应用程序认为它从来没有启动过。]

如果需要重新处理以前已经处理过的记录以重建状态,这将通过其余Streams拓扑传播记录(例如InputTopic->有状态处理器-> OutputTopic,这将导致OutputTopic中的记录重复,因为重建状态)?

如上所述,默认情况下不会进行此重新处理。在应用程序停止时,状态将自动重建为其先前的状态(双关语意)。

仅当您手动重置应用程序(见上文)并且例如配置应用程序以重新读取历史数据(例如在执行重置后将auto.offset.reset设置为earliest)。

© www.soinside.com 2019 - 2024. All rights reserved.