为什么 KStreams 的 outerJoin 滑动窗口的不完整结果在新的不相关记录稍后到达之前不会被刷新?

问题描述 投票:0回答:0

我有三个主题和一个从中读取的 KStream 应用程序。

x、y 和 z 将收到消息,我想使用 KStream 应用程序中的窗口处理它们。

关键是,在同一个窗口中,有时我会同时收到来自一个主题、两个主题或三个主题的消息:

Window 1 ---> merged(z)
Window 2 ---> merged(x + z)
Window 3 ---> merged(x + y + z)
Window 4 ---> merged(x)
...

能否在一个窗口中同时加入两个以上的topic(KStreams)?

是否支持上图窗口的处理?

更新: 到目前为止我试过这个:

    public KStream<String, String> joinedKStream(StreamsBuilder kStreamBuilder) {
        KStream<String, String> streamX = kStreamBuilder.stream("x");
        KStream<String, String> streamY = kStreamBuilder.stream("y");
        KStream<String, String> streamZ = kStreamBuilder.stream("z");

        KStream<String, String> stream = streamX
                .leftJoin(streamY, (s, s2) -> {
                    logger.info("Join x + y: {}-{}", s, s2);
                    return s + s2;
                }, JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofSeconds(20)))
                .leftJoin(streamZ, (s, s2) -> {
                    logger.info("Join xy + z: {}-{}", s, s2);
                    return s + s2;
                }, JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofSeconds(20)));
        stream.to("merged");
        return stream;
    }

如果我只在主题 z 中发布一条消息“msg”,我期待一条由 nullnullmsg 组成的消息。然而,当唯一的输入是 z 时,它似乎并没有合并。

似乎需要在主题 x 中有一条消息才能使加入工作,但我需要从任何主题加入一个窗口。

更新 2

逻辑上我需要一个 outerJoin:

    @Bean
    public KStream<String, String> joinedKStream(StreamsBuilder kStreamBuilder) {
        KStream<String, String> streamX = kStreamBuilder.stream("x");
        streamX.peek((k, v) -> logger.info("x - key {} - value {}", k, v));
        KStream<String, String> streamY = kStreamBuilder.stream("y");
        streamY.peek((k, v) -> logger.info("y - key {} - value {}", k, v));
        KStream<String, String> streamZ = kStreamBuilder.stream("z");
        streamZ.peek((k, v) -> logger.info("z - key {} - value {}", k, v));

        //JoinWindows joinWindow = JoinWindows.of(Duration.ofSeconds(5)).grace(Duration.ofSeconds(1));
        JoinWindows joinWindow = JoinWindows.ofTimeDifferenceAndGrace(Duration.ofSeconds(5), Duration.ofSeconds(10));

        KStream<String, String> stream = streamX
                .outerJoin(streamY, (k, s, s2) -> {
                    logger.info("Join x + y: key {} - values {}-{}", k, s, s2);
                    return s + s2;
                }, joinWindow)
                .outerJoin(streamZ, (k, s, s2) -> {
                    logger.info("Join xy + z: key {} - values {}-{}", k, s, s2);
                    return s + s2;
                }, joinWindow);
        stream.foreach((key, value) -> logger.info("Merged key {} - value {}", key, value));
        stream.to("merged");
        return stream;

但是还有一个问题:

我发送“嘿”到主题 y 和“你!”主题 z。很长一段时间过去了,合并主题中什么也没有出现。

在某些时候,在发布更多消息后(几分钟后),执行连接并且

nullheyyo!
出现在合并的主题中。为什么会延迟?

更重要的是,如果在任何主题中都没有发布已知消息,则不会加入以前的消息。我预计在窗口期结束后,合并将自动刷新。

这是一个正在发生的事情的例子:

  • 我1:1发布主题x
  • xKStream 日志:
    x - key 1 - value 1
  • 在窗口持续时间 + 宽限期(15 秒)之后没有加入发生。
  • 我发布2:2主题x
  • 日志:
x - key 2 - value 2
Join x + y: key 1 - values 1-null
  • 如您所见,关于第二次连接还没有任何内容。
  • 终于当我发布另一张唱片 3:3 to x:
x - key 3 - value 3
Join x + y: key 2 - values 2-null
Merged key 1 - value 1nullnull

最后我们得到合并后的日志,将加入的记录刷入合并后的主题

???

java apache-kafka apache-kafka-streams
© www.soinside.com 2019 - 2024. All rights reserved.