我正在构建以下Kafka Streams拓扑(伪代码):
gK = builder.stream().gropuByKey();
g1 = gK.windowedBy(TimeWindows.of("PT1H")).reduce().mapValues().toStream().mapValues().selectKey();
g2 = gK.reduce().mapValues();
g1.leftJoin(g2).to();
[如果您注意到,这是一种菱形拓扑,从单个输入主题开始,以单个输出主题结束,消息流经两个并行流,最终在最后合并在一起。一种流适用(滚动?)加窗,另一种不适用。流的两个部分都在同一个键上工作(除了窗口中间引入的WindowedKey之外)。
我的消息的时间戳是事件时间。也就是说,它们是通过我的自定义配置TimestampExtractor
实现从消息正文中选取的。我邮件中的实际时间戳是过去的几年。
在我的带有两个输入/输出消息的单元测试中,以及在运行时环境(使用真实的Kafka)中,所有这些功能乍一看都很好。
[消息数量开始显着增加(例如40K)时似乎出现了问题。
我失败的情况如下:
~~ 40K记录,其中same键首先上传到输入主题中
〜40K更新为从输出主题出来,如预期的那样
另一个〜40K记录与第1步相同但不同)输入主题
只有〜100个更新来自输出主题,而不是预期的新〜40K更新。没有什么特别的看到这些〜100个更新,它们的内容似乎是正确的,但是仅在某些时间范围内。对于其他时间窗口,没有即使应明确定义流逻辑和输入数据也要进行更新生成40K条记录。实际上,当我在步骤1中交换数据集时)和3)我的情况完全相同,来自的〜40K更新第二个数据集,且与第一个数据集的编号相同〜100。
我可以在本地使用TopologyTestDriver
的单元测试中轻松重现此问题(但仅在大量输入记录上使用。)>
在我的测试中,我尝试使用StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG
禁用缓存。不幸的是,这没有任何区别。
UPDATE
我尝试了reduce()调用和aggregate()调用。在两种情况下,问题仍然存在。
我还需要注意的是,在将StreamsConfig.TOPOLOGY_OPTIMIZATION
设置为StreamsConfig.OPTIMIZE
的情况下,如果没有它,则在调试器before]中调用mapValues()处理程序。至少是第一次。我没想到。
不幸的是,join()和leftJoin()都尝试过相同的结果。在调试器中,数据的第二部分根本不会触发“左”流程中的reduce()处理程序,但是会触发“右”流程中的reduce()处理程序。
通过我的配置,如果两个数据集中的数量或记录中的每一个都为100,则问题不会自行显现,我将得到200条输出消息,正如我期望的那样。当我在每个数据集中将数字增加到200时,我收到的预期消息少于400。因此,目前看来,诸如“旧”窗口之类的东西将被删除,而那些旧窗口的新记录将被流忽略。有可以设置的窗口保留设置,但是使用我使用的默认值,我希望窗口能够保留其状态并保持活动至少12小时(这远远超出了单元测试的运行时间)。
尝试使用以下窗口存储配置修改左减速器:
Materialized.as( Stores.inMemoryWindowStore( "rollup-left-reduce", Duration.ofDays(5 * 365), Duration.ofHours(1), false) )
结果仍然没有差异。
即使只有单个“左”流而没有“右”流,也没有join(),相同的问题仍然存在。看来问题出在我设置的窗口保留设置中。我的输入记录的时间戳(事件时间)跨越2年。第二个数据集再次从2年开始。 Kafka Streams中的这个位置可确保第二个数据集记录被忽略:
Kafka Streams版本是2.4.0。也使用Confluent依赖版本5.4.0。
我的问题是
我正在构建以下Kafka Streams拓扑(伪代码):gK = builder.stream()。gropuByKey(); g1 = gK.windowedBy(TimeWindows.of(“ PT1H”))。reduce()。mapValues()。toStream()。mapValues()。selectKey()...
经过一段时间的调试后,我找到了造成问题的原因。
我的输入数据集包含带有跨2年时间戳记的记录。我正在加载第一个数据集,并从输入数据集中将流的“观察”时间设置为最大时间戳。