我的应用程序使用kafka流suppress
逻辑。
我想使用抑制来测试kafka流拓扑。
正在运行uinit测试,我的拓扑未发出结果。
Kafka流逻辑
...
.suppress(Suppressed.untilTimeLimit(Duration.ofSeconds(5), Suppressed.BufferConfig.maxBytes(1_000_000_000L).emitEarlyWhenFull()))
...
我的测试用例代码。创建输入数据后,正在运行的测试用例无法读取抑制逻辑输出记录。只需返回null
testDriver.pipeInput(recordFactory.create("input", key, dummy, 0L));
System.out.println(testDriver.readOutput("streams-result", Serdes.String().deserializer(), serde.deserializer()));
我可以测试抑制逻辑吗?
简单的答案是肯定的。
一些好的参考文献是Confluent Example Tests此示例特别测试抑制功能。其他许多示例始终是首先检查的好地方。这是用Kotlin编写的另一个示例。
有关此功能及其测试的说明,可在此blog post的第3页中找到
一些关键点:
props [StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG] = 0props [StreamsConfig.COMMIT_INTERVAL_MS_CONFIG] = 5
希望这会有所帮助。