我有三个主题和一个从中读取的 KStream 应用程序。
x、y 和 z 将收到消息,我想使用 KStream 应用程序中的窗口处理它们。
关键是,在同一个窗口中,有时我会同时收到来自一个主题、两个主题或三个主题的消息:
Window 1 ---> merged(z)
Window 2 ---> merged(x + z)
Window 3 ---> merged(x + y + z)
Window 4 ---> merged(x)
...
能否在一个窗口中同时加入两个以上的topic(KStreams)?
是否支持上图窗口的处理?
更新: 到目前为止我试过这个:
public KStream<String, String> joinedKStream(StreamsBuilder kStreamBuilder) {
KStream<String, String> streamX = kStreamBuilder.stream("x");
KStream<String, String> streamY = kStreamBuilder.stream("y");
KStream<String, String> streamZ = kStreamBuilder.stream("z");
KStream<String, String> stream = streamX
.leftJoin(streamY, (s, s2) -> {
logger.info("Join x + y: {}-{}", s, s2);
return s + s2;
}, JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofSeconds(20)))
.leftJoin(streamZ, (s, s2) -> {
logger.info("Join xy + z: {}-{}", s, s2);
return s + s2;
}, JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofSeconds(20)));
stream.to("merged");
return stream;
}
如果我只在主题 z 中发布一条消息“msg”,我期待一条由 nullnullmsg 组成的消息。然而,当唯一的输入是 z 时,它似乎并没有合并。
似乎需要在主题 x 中有一条消息才能使加入工作,但我需要从任何主题加入一个窗口。
更新 2
逻辑上我需要一个 outerJoin:
@Bean
public KStream<String, String> joinedKStream(StreamsBuilder kStreamBuilder) {
KStream<String, String> streamX = kStreamBuilder.stream("x");
streamX.peek((k, v) -> logger.info("x - key {} - value {}", k, v));
KStream<String, String> streamY = kStreamBuilder.stream("y");
streamY.peek((k, v) -> logger.info("y - key {} - value {}", k, v));
KStream<String, String> streamZ = kStreamBuilder.stream("z");
streamZ.peek((k, v) -> logger.info("z - key {} - value {}", k, v));
//JoinWindows joinWindow = JoinWindows.of(Duration.ofSeconds(5)).grace(Duration.ofSeconds(1));
JoinWindows joinWindow = JoinWindows.ofTimeDifferenceAndGrace(Duration.ofSeconds(5), Duration.ofSeconds(10));
KStream<String, String> stream = streamX
.outerJoin(streamY, (k, s, s2) -> {
logger.info("Join x + y: key {} - values {}-{}", k, s, s2);
return s + s2;
}, joinWindow)
.outerJoin(streamZ, (k, s, s2) -> {
logger.info("Join xy + z: key {} - values {}-{}", k, s, s2);
return s + s2;
}, joinWindow);
stream.foreach((key, value) -> logger.info("Merged key {} - value {}", key, value));
stream.to("merged");
return stream;
但是还有一个问题:
我发送“嘿”到主题 y 和“你!”主题 z。很长一段时间过去了,合并主题中什么也没有出现。
在某些时候,在发布更多消息后(几分钟后),执行连接并且
nullheyyo!
出现在合并的主题中。为什么会延迟?
更重要的是,如果在任何主题中都没有发布已知消息,则不会加入以前的消息。我预计在窗口期结束后,合并将自动刷新。
这是一个正在发生的事情的例子:
x - key 1 - value 1
x - key 2 - value 2
Join x + y: key 1 - values 1-null
x - key 3 - value 3
Join x + y: key 2 - values 2-null
Merged key 1 - value 1nullnull
最后我们得到合并后的日志,将加入的记录刷入合并后的主题
???