我正在构建一个Kafka Streams应用程序,它通过将每个新计算对象与最后一个已知对象进行比较来生成更改事件。
因此,对于输入主题上的每条消息,我更新状态存储中的对象并且每隔一段时间(使用标点符号),我对此对象应用计算并将结果与先前的计算结果进行比较(来自另一个状态存储) )。
为了确保此操作是一致的,我在标点符号触发后执行以下操作:
context.forward
他们。所以事件转到结果主题。我将此元组用于应用程序崩溃或重新平衡的场景,因此我可以在继续之前发送正确的事件集。
现在,我注意到结果事件并不总是一致的,特别是如果应用程序经常重新平衡。在极少数情况下,Kafka Streams应用程序会向结果主题发出事件,但更改日志主题尚未更新。换句话说,我为结果主题制作了一些内容,但我的更改日志主题尚未处于同一状态。
所以,当我执行stateStore.put()
并且方法调用成功返回时,是否有任何保证何时会出现在changelog主题上?
我可以强制执行changelog flush吗?当我做context.commit()
时,什么时候会刷新+提交?
要获得完全一致性,您需要启用processing.guarantee="exaclty_once"
- 否则,如果发生潜在错误,您可能会得到不一致的结果。
如果您希望使用“at_least_once”,则可能需要使用单个存储,并在处理完成后更新存储(即,在调用forward()
之后)。这最小化了时间窗口以获得不一致。
是的,如果您在提交输入主题偏移之前调用context.commit()
,则所有存储将刷新到磁盘,并且所有待处理的生成器写入也将被刷新。