我有 2 个 Kafka 主题,它们从不同的来源流式传输完全相同的内容,因此在其中一个来源发生故障时我可以拥有高可用性。 我正在尝试使用 Kafka Streams 0.10.1.0 将 2 个主题合并为 1 个输出主题,这样我就不会错过任何失败消息,并且当所有源都启动时不会出现重复。
当使用KStream的
leftJoin
方法时,其中一个主题(次要主题)可以毫无问题地宕机,但是当主主题宕机时,不会向输出主题发送任何内容。这似乎是因为,根据 Kafka Streams 开发人员指南,
KStream-KStream leftJoin 始终由来自主流的记录驱动
因此,如果没有来自主流的记录,即使存在,也不会使用来自次流的记录。一旦主要流恢复在线,输出就会恢复正常。
我还尝试使用
outerJoin
(添加重复记录),然后转换为 KTable 和 groupByKey 来消除重复项,
KStream mergedStream = stream1.outerJoin(stream2,
(streamVal1, streamVal2) -> (streamVal1 == null) ? streamVal2 : streamVal1,
JoinWindows.of(2000L))
mergedStream.groupByKey()
.reduce((value1, value2) -> value1, TimeWindows.of(2000L), stateStore))
.toStream((key,value) -> value)
.to(outputStream)
但我仍然偶尔会收到重复的内容。我还使用
commit.interval.ms=200
来让 KTable 足够频繁地发送到输出流。
处理此合并以从多个相同的输入主题获取一次性输出的最佳方法是什么?
更新
从 Kafka 3.1 开始,流-流左/外连接的语义发生了变化,从“渴望发出”(这可能导致虚假的左/外连接结果)到“关闭时发出”。使用新的语义,使用
outerJoin
不应导致任何重复。
原答案
使用任何类型的连接都无法解决您的问题,因为您总是会得到丢失结果(内部连接,以防某些流停滞)或“重复”
null
(左连接或外部连接,以防某些流停滞)两个流都在线)。有关 Kafka Streams 中连接语义的详细信息,请参阅 https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Join+Semantics。
因此,我建议使用处理器 API,您可以使用 KStream
process()
、
transform()
或
transformValues()
与 DSL 混合搭配。有关更多详细信息,请参阅如何使用 Kafka Stream DSL 通过处理器过滤键和值。 您还可以向处理器添加自定义存储(如何将自定义 StateStore 添加到 Kafka Streams DSL 处理器?