StateStoreSupplier在KafkaStreams中存储序列

问题描述 投票:1回答:1

我需要重新排序来自两个主题的数据(使用外部联接合并)。使用StateStore保持最新序列并使用重新排序的消息修改下游流值是一种好习惯。

简化问题:

(来自主题A的seq,来自主题B的seq) - >输出的新seq(保持当前序列在StateStore中)

(10,100) -> 1

(11,101) -> 2

(12,102) -> 3

(...,...) -> ...

新序列将存储为stateStore中键“currentSeq”的值。序列将在每条消息上递增并存储回stateStore。

apache-kafka-streams
1个回答
2
投票

您应该使用具有已注册(可能是自定义)状态的Processor API。

您还可以使用process()transform()transformValue()将Processor API与DSL混合搭配,并引用状态存储(按名称)。

看到

© www.soinside.com 2019 - 2024. All rights reserved.