是否可以使用窗口加入可选的多个(3+)KStreams?

问题描述 投票:0回答:0

我有三个主题和一个从中读取的 KStream 应用程序。

x、y 和 z 将收到消息,我想使用 KStream 应用程序中的窗口处理它们。

关键是,在同一个窗口中,有时我会同时收到来自一个主题、两个主题或三个主题的消息:

Window 1 ---> z
Window 2 ---> x + z
Window 3 ---> x + y + z
Window 4 ---> x
...

能否在一个窗口中同时加入两个以上的topic(KStreams)?

是否支持上图窗口的处理?

一些显示它的代码将不胜感激。

更新: 到目前为止我试过这个:

    public KStream<String, String> joinedKStream(StreamsBuilder kStreamBuilder) {
        KStream<String, String> streamX = kStreamBuilder.stream("x");
        KStream<String, String> streamY = kStreamBuilder.stream("y");
        KStream<String, String> streamZ = kStreamBuilder.stream("z");

        KStream<String, String> stream = streamX
                .leftJoin(streamY, (s, s2) -> {
                    logger.info("Join x + y: {}-{}", s, s2);
                    return s + s2;
                }, JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofSeconds(20)))
                .leftJoin(streamZ, (s, s2) -> {
                    logger.info("Join xy + z: {}-{}", s, s2);
                    return s + s2;
                }, JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofSeconds(20)));
        stream.to("merged");
        return stream;
    }

如果我只在主题 z 中发布一条消息“msg”,我期待一条由 nullnullmsg 组成的消息。然而,当唯一的输入是 z 时,它似乎并没有合并。

似乎需要在主题 x 中有一条消息才能使加入工作,但我需要从任何主题加入一个窗口。

更新 2

它使用 outerJoin 工作:

    @Bean
    public KStream<String, String> joinedKStream(StreamsBuilder kStreamBuilder) {
        KStream<String, String> streamX = kStreamBuilder.stream("x");
        KStream<String, String> streamY = kStreamBuilder.stream("y");
        KStream<String, String> streamZ = kStreamBuilder.stream("z");

        KStream<String, String> stream = streamX
                .outerJoin(streamY, (s, s2) -> {
                    logger.info("Join x + y: {}-{}", s, s2);
                    return s + s2;
                }, JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofSeconds(20)))
                .outerJoin(streamZ, (s, s2) -> {
                    logger.info("Join xy + z: {}-{}", s, s2);
                    return s + s2;
                }, JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofSeconds(20)));
        stream.to("merged");
        return stream;
    }

但是还有一个问题:

我发送“嘿”给你和“你!”到 z。很长一段时间过去了,合并主题中什么也没有出现。

在某些时候,在发布更多消息后(几分钟后),执行连接并且 nullheyyo!出现在合并主题中。为什么会延迟?

更重要的是,如果任何主题中没有发布已知消息,则不会加入以前的消息。 ????

java apache-kafka apache-kafka-streams
© www.soinside.com 2019 - 2024. All rights reserved.