我可以测试kafka流抑制逻辑吗?

问题描述 投票:0回答:1

我的应用程序使用kafka流suppress逻辑。

我想使用抑制来测试kafka流拓扑。

正在运行uinit测试,我的拓扑未发出结果。

Kafka流逻辑

...
.suppress(Suppressed.untilTimeLimit(Duration.ofSeconds(5), Suppressed.BufferConfig.maxBytes(1_000_000_000L).emitEarlyWhenFull()))
...

我的测试用例代码。创建输入数据后,正在运行的测试用例无法读取抑制逻辑输出记录。只需返回null

testDriver.pipeInput(recordFactory.create("input", key, dummy, 0L));

System.out.println(testDriver.readOutput("streams-result", Serdes.String().deserializer(), serde.deserializer()));

我可以测试抑制逻辑吗?

apache-kafka apache-kafka-streams
1个回答
0
投票

简单的答案是肯定的。

一些好的参考文献是Confluent Example Tests此示例特别测试抑制功能。其他许多示例始终是首先检查的好地方。这是用Kotlin编写的另一个示例。

有关此功能及其测试的说明,可在此blog post的第3页中找到

一些关键点:

  • 该窗口将仅按文档预期发出最终结果。
  • 要刷新最终结果,您将需要发送一个额外的虚拟事件,如示例所示(例如,汇合here
  • 您将需要操纵事件时间来测试它,因为抑制可以抵消事件时间,可以通过测试输入主题API或使用自定义的[[TimestampExtractor来提供。
  • 为了进行测试,建议您设置以下内容以删除缓存并减少提交间隔。

    props [StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG] = 0props [StreamsConfig.COMMIT_INTERVAL_MS_CONFIG] = 5

希望这会有所帮助。

© www.soinside.com 2019 - 2024. All rights reserved.