带有窗口的Kafka Streams拓扑不会触发状态更改

问题描述 投票:3回答:1

我正在构建以下Kafka Streams拓扑(伪代码):

gK = builder.stream().gropuByKey();
g1 = gK.windowedBy(TimeWindows.of("PT1H")).reduce().mapValues().toStream().mapValues().selectKey();
g2 = gK.reduce().mapValues();
g1.leftJoin(g2).to();

[如果您注意到,这是一种菱形拓扑,从单个输入主题开始,以单个输出主题结束,消息流经两个并行流,最终在最后合并在一起。一种流适用(滚动?)加窗,另一种不适用。流的两个部分都在同一个键上工作(除了窗口中间引入的WindowedKey之外)。

我的消息的时间戳是事件时间。也就是说,它们是通过我的自定义配置TimestampExtractor实现从消息正文中选取的。我邮件中的实际时间戳是过去的几年。

在我的带有两个输入/输出消息的单元测试中,以及在运行时环境(使用真实的Kafka)中,所有这些功能乍一看都很好。

[消息数量开始显着增加(例如40K)时似乎出现了问题。

我失败的情况如下:

  1. ~~ 40K记录,其中same键首先上传到输入主题中

  2. 〜40K更新为从输出主题出来,如预期的那样

  3. 另一个〜40K记录与第1步相同但不同)输入主题

  4. 只有〜100个更新来自输出主题,而不是预期的新〜40K更新。没有什么特别的看到这些〜100个更新,它们的内容似乎是正确的,但是仅在某些时间范围内。对于其他时间窗口,没有即使应明确定义流逻辑和输入数据也要进行更新生成40K条记录。实际上,当我在步骤1中交换数据集时)和3)我的情况完全相同,来自的〜40K更新第二个数据集,且与第一个数据集的编号相同〜100。

我可以在本地使用TopologyTestDriver的单元测试中轻松重现此问题(但仅在大量输入记录上使用。)>

在我的测试中,我尝试使用StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG禁用缓存。不幸的是,这没有任何区别。

UPDATE

我尝试了reduce()调用和aggregate()调用。在两种情况下,问题仍然存在。

我还需要注意的是,在将StreamsConfig.TOPOLOGY_OPTIMIZATION设置为StreamsConfig.OPTIMIZE的情况下,如果没有它,则在调试器before]中调用mapValues()处理程序。至少是第一次。我没想到。

不幸的是,join()和leftJoin()都尝试过相同的结果。在调试器中,数据的第二部分根本不会触发“左”流程中的reduce()处理程序,但是会触发“右”流程中的reduce()处理程序。

通过我的配置,如果两个数据集中的数量或记录中的每一个都为100,则问题不会自行显现,我将得到200条输出消息,正如我期望的那样。当我在每个数据集中将数字增加到200时,我收到的预期消息少于400。因此,目前看来,诸如“旧”窗口之类的东西将被删除,而那些旧窗口的新记录将被流忽略。有可以设置的窗口保留设置,但是使用我使用的默认值,我希望窗口能够保留其状态并保持活动至少12小时(这远远超出了单元测试的运行时间)。

尝试使用以下窗口存储配置修改左减速器:

Materialized.as(
    Stores.inMemoryWindowStore(
        "rollup-left-reduce",
        Duration.ofDays(5 * 365),
        Duration.ofHours(1), false)
)

结果仍然没有差异。

即使只有单个“左”流而没有“右”流,也没有join(),相同的问题仍然存在。看来问题出在我设置的窗口保留设置中。我的输入记录的时间戳(事件时间)跨越2年。第二个数据集再次从2年开始。 Kafka Streams中的这个位置可确保第二个数据集记录被忽略:

https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java#L125

Kafka Streams版本是2.4.0。也使用Confluent依赖版本5.4.0。

我的问题是

  • 这种行为可能是什么原因?
  • 我是否错过了流拓扑中的任何内容?
  • 这样的拓扑是否应该可以工作?
  • 我正在构建以下Kafka Streams拓扑(伪代码):gK = builder.stream()。gropuByKey(); g1 = gK.windowedBy(TimeWindows.of(“ PT1H”))。reduce()。mapValues()。toStream()。mapValues()。selectKey()...

apache-kafka apache-kafka-streams
1个回答
0
投票

经过一段时间的调试后,我找到了造成问题的原因。

我的输入数据集包含带有跨2年时间戳记的记录。我正在加载第一个数据集,并从输入数据集中将流的“观察”时间设置为最大时间戳。

© www.soinside.com 2019 - 2024. All rights reserved.