在我的 Spring Boot 应用程序中,我使用了 kafka 流。它首先按key对来自某个主题的消息进行分组,按照一定的时间间隔对它们进行窗口化,使用reduce仅保留每个key的最新消息并处理,最后将每个key的时间窗口内的最新消息发送到另一个队列时间窗口。
stream.groupByKey()
.windowedBy(window)
.reduce((oldValue, newValue) -> newValue, materialized)
.toStream()
.process(() -> /*process and send message to a queue*/);
它仅适用于一个实例,但当实例数量增加时,每个实例都会处理相同的消息,并且在队列中,我会在同一时间窗口看到同一键的多条消息。
我只想要一条与时间间隔中的键对应的消息。我的目标是,如果一个实例使用某个键处理消息,则其他实例不应处理该键的消息。有没有可能的方法可以在没有消息处理逻辑的自定义实现的情况下做到这一点?
您需要考虑两件事:
Kafka Streams 假设输入数据按键分区。如果不是这种情况,您需要将
groupByKey()
替换为 groupBy((k,v) -> k))
或 repartition().groupByKey()
。如果数据未按键分区,则单个键可能会获得多个窗口。
Kafka Streams 默认情况下不会为窗口聚合发出单个“最终”结果,但只要窗口打开,它就会不断细化结果,并发出中间结果。如果您想要每个窗口一个结果,您可以使用
.reduce(...).suppress(...)
或 .windowBy(...).emitStrategy(...)
(查看文档以获取有关这两个选项的详细信息)。