我需要重新排序来自两个主题的数据(使用外部联接合并)。使用StateStore
保持最新序列并使用重新排序的消息修改下游流值是一种好习惯。
简化问题:
(来自主题A的seq,来自主题B的seq) - >输出的新seq(保持当前序列在StateStore
中)
(10,100) -> 1
(11,101) -> 2
(12,102) -> 3
(...,...) -> ...
新序列将存储为stateStore中键“currentSeq”的值。序列将在每条消息上递增并存储回stateStore。
您应该使用具有已注册(可能是自定义)状态的Processor API。
您还可以使用process()
,transform()
或transformValue()
将Processor API与DSL混合搭配,并引用状态存储(按名称)。
看到