我编写了一个流,它接收消息并发送已出现的键表。如果出现某些内容,它将显示计数为 1。这是我的生产代码的简化版本,用于演示该错误。在实时运行中,每收到一条消息就会发送一条消息。
但是,当我使用 ProcessorTopologyTestDriver 在单元测试中运行它时,我得到了不同的行为。如果收到之前已经见过的密钥,我会收到一条额外的消息。
如果我使用键“key1”、然后“key2”、然后“key1”发送消息,我会得到以下输出。
key1 - 1
key2 - 1
key1 - 0
key1 - 1
由于某种原因,它会先递减该值,然后再将其添加回去。这种情况仅在使用 ProcessorTopologyTestDriver 时发生。这是预期的吗?有解决办法吗?或者这是一个错误?
这是我的拓扑:
final StreamsBuilder builder = new StreamsBuilder();
KGroupedTable<String, String> groupedTable
= builder.table(applicationConfig.sourceTopic(), Consumed.with(Serdes.String(), Serdes.String()))
.groupBy((key, value) -> KeyValue.pair(key, value), Serialized.with(Serdes.String(), Serdes.String()));
KTable<String, Long> countTable = groupedTable.count();
KStream<String, Long> countTableAsStream = countTable.toStream();
countTableAsStream.to(applicationConfig.outputTopic(), Produced.with(Serdes.String(), Serdes.Long()));
这是我的单元测试代码:
TopologyWithGroupedTable top = new TopologyWithGroupedTable(appConfig, map);
Topology topology = top.get();
ProcessorTopologyTestDriver driver = new ProcessorTopologyTestDriver(config, topology);
driver.process(inputTopic, "key1", "theval", Serdes.String().serializer(), Serdes.String().serializer());
driver.process(inputTopic, "key2", "theval", Serdes.String().serializer(), Serdes.String().serializer());
driver.process(inputTopic, "key1", "theval", Serdes.String().serializer(), Serdes.String().serializer());
ProducerRecord<String, Long> outputRecord = driver.readOutput(outputTopic, keyDeserializer, valueDeserializer);
assertEquals("key1", outputRecord.key());
assertEquals(Long.valueOf(1L), outputRecord.value());
outputRecord = driver.readOutput(outputTopic, keyDeserializer, valueDeserializer);
assertEquals("key2", outputRecord.key());
assertEquals(Long.valueOf(1L), outputRecord.value());
outputRecord = driver.readOutput(outputTopic, keyDeserializer, valueDeserializer);
assertEquals("key1", outputRecord.key());
assertEquals(Long.valueOf(1L), outputRecord.value()); //this fails, I get 0. If I pull another message, it shows key1 with a count of 1
这是完整代码的存储库:
https://bitbucket.org/nsinha/testtopologywithgroupedtable/src/master/
更新
在 Kafka 3.5 中,此行为通过 KIP-904 得到了改进。如果从单个上游更新中为相同的键调用加法器和减法器(这不是一般情况,对于单个上游更新为不同的键调用减法器和加法器),现在仅发出单个结果记录。
原答案
这不是一个错误,而是设计的行为(参见下面的解释)。
行为差异是由于
KTable
状态存储缓存造成的(参见 https://docs.confluence.io/current/streams/developer-guide/memory-mgmt.html)。当您运行单元测试时,缓存会在每条记录后刷新,而在生产运行中,情况并非如此。如果您在生产运行中禁用缓存,我认为它的行为与单元测试中的行为相同。
旁注:
是一个内部类,不是公共API的一部分。因此,不存在兼容性保证。您应该使用官方的单元测试包:https://docs.confluence.io/current/streams/developer-guide/test-streams.htmlProcessorTopologyTestDriver
为什么会看到两条记录:
在您的代码中,您使用的是
KTable#groupBy()
,并且在您的特定用例中,您不更改密钥。但是,一般情况下,键可能会更改(取决于输入KTable
的值。因此,如果输入KTable
更改,下游聚合需要从聚合中删除/减去旧的键值对)结果,并将新的键值对添加到聚合结果中 - 一般情况下,新旧键值对的键不同,因此需要生成两条记录,因为减法和加法可能会在不同的实例上发生不同的键可能会以不同的方式进行散列。这有意义吗?
因此,对于输入
KTable
的每次更新,通常需要计算两个不同键值对上的两次更新 KTable
的结果。对于您的特定情况,其中密钥不会更改,Kafka Stream 会执行相同的操作(如果密钥实际上相同,则不会对这种情况进行检查/优化以将两个操作“合并”为一个)。