我有三个主题和一个从中读取的 KStream 应用程序。
x、y 和 z 将收到消息,我想使用 KStream 应用程序中的窗口处理它们。
关键是,在同一个窗口中,有时我会同时收到来自一个主题、两个主题或三个主题的消息:
Window 1 ---> z
Window 2 ---> x + z
Window 3 ---> x + y + z
Window 4 ---> x
...
能否在一个窗口中同时加入两个以上的topic(KStreams)?
是否支持上图窗口的处理?
一些显示它的代码将不胜感激。
更新: 到目前为止我试过这个:
public KStream<String, String> joinedKStream(StreamsBuilder kStreamBuilder) {
KStream<String, String> streamX = kStreamBuilder.stream("x");
KStream<String, String> streamY = kStreamBuilder.stream("y");
KStream<String, String> streamZ = kStreamBuilder.stream("z");
KStream<String, String> stream = streamX
.leftJoin(streamY, (s, s2) -> {
logger.info("Join x + y: {}-{}", s, s2);
return s + s2;
}, JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofSeconds(20)))
.leftJoin(streamZ, (s, s2) -> {
logger.info("Join xy + z: {}-{}", s, s2);
return s + s2;
}, JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofSeconds(20)));
stream.to("merged");
return stream;
}
如果我只在主题 z 中发布一条消息“msg”,我期待一条由 nullnullmsg 组成的消息。然而,当唯一的输入是 z 时,它似乎并没有合并。
似乎需要在主题 x 中有一条消息才能使加入工作,但我需要从任何主题加入一个窗口。
更新 2
它使用 outerJoin 工作:
@Bean
public KStream<String, String> joinedKStream(StreamsBuilder kStreamBuilder) {
KStream<String, String> streamX = kStreamBuilder.stream("x");
KStream<String, String> streamY = kStreamBuilder.stream("y");
KStream<String, String> streamZ = kStreamBuilder.stream("z");
KStream<String, String> stream = streamX
.outerJoin(streamY, (s, s2) -> {
logger.info("Join x + y: {}-{}", s, s2);
return s + s2;
}, JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofSeconds(20)))
.outerJoin(streamZ, (s, s2) -> {
logger.info("Join xy + z: {}-{}", s, s2);
return s + s2;
}, JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofSeconds(20)));
stream.to("merged");
return stream;
}
但是还有一个问题:
我发送“嘿”给你和“你!”到 z。很长一段时间过去了,合并主题中什么也没有出现。
在某些时候,在发布更多消息后(几分钟后),执行连接并且 nullheyyo!出现在合并主题中。为什么会延迟?
更重要的是,如果任何主题中没有发布已知消息,则不会加入以前的消息。 ????