我使用 JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofHours(24)) 进行左连接时遇到问题

问题描述 投票:0回答:1

当我使用 JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofHours(24)) 进行左连接时遇到问题

这个方法是否应该将左流中的所有记录保留24小时才能在右流中找到匹配的记录?

我注意到许多记录在被消耗到左流中几个小时后就被发出了:

这是我的代码:

public KStream<String, KafkaJobEvent> kStream(
        @Value("${input-topic}") final String inputTopic,
        final StreamsBuilder builder) {

    final KStream<String, KafkaJobEvent> sourceStream = builder.stream(inputTopic,
                    Consumed.with(Serdes.ByteArray(), Serdes.ByteArray())
                            .withTimestampExtractor(new DefaultTimestampExtractor(logReader)))
           .selectKey((k, v) -> v.getID); // re-key the stream 


    final Map<String, KStream<String, KafkaJobEvent>> branches = sourceStream.split(Named.as(SUB_STREAM))
            .branch((key, value) -> value.getTarget().contains("A"), Branched.as("A"))
            .branch((key, value) -> value.getTarget().contains("B"), Branched.as("B"))
            .defaultBranch();


    final KStream<String, KafkaJobEvent> streamA = branches.get(SUB_STREAM + "A");
    final KStream<String, KafkaJobEvent> streamB = branches.get(SUB_STREAM +"B");


    final Duration windowDuration = Duration.ofHours(24);

    return streamA
            .leftJoin(
                    streamB,
                    new ValueJoiner<KafkaJobEvent, KafkaJobEvent, KafkaJobEventJoinedValue>() {
                        @Override
                        public KafkaJobEventJoinedValue apply(final KafkaJobEvent left, final KafkaJobEvent right) {
                           
                           return KafkaJobEventJoinedValue.builder()
                                    .left(left)
                                    .right(right)
                                    .build();
                        }
                    },
                    JoinWindows.ofTimeDifferenceWithNoGrace(windowDuration),
                    StreamJoined.with(
                            Serdes.String(), // Key serde
                            kafkaJobEventSerde, // Left value serde
                            kafkaJobEventSerde  // Right value serde
                    ))
            .filter((key, value) -> value.getRight() == null)
            .mapValues(KafkaJobEventJoinedValue::getLeft)
            .process(() -> new ContextualProcessor<String, KafkaJobEvent, String, KafkaJobEvent>() {
                @Override
                public void process(final Record<String, KafkaJobEvent> record) {
                    try {
                        log.info("the timestamp:{} for record key:{}, record value:{}",
                                record.timestamp(), record.key(), record.value());
                       // do something ...
                    } catch (final Exception e) {
                        log.error("Exception found while handling message for record value:{}", record.value(), e);
                    }

                }
            });



}
apache-kafka spring-kafka kafka-consumer-api apache-kafka-streams
1个回答
0
投票

这个方法是否应该将左流中的所有记录保留24小时才能在右流中找到匹配的记录?

是的,确实如此。不仅适用于左流,还适用于右流。

我注意到许多记录在被消耗到左流中几个小时后就被发出了:

好吧,如果左输入记录尚未加入,因为没有匹配的记录到达右侧,则不会立即发出任何内容。仅当左侧输入记录在 24 小时窗口内根本没有加入时,才会在 24 小时后发出

<key, <left,null>>
结果(您的加入窗口大小,假设您不使用宽限期)。

左侧结果无法立即发出,因为左侧记录可能仍会与未来的右侧记录连接,并且发出左连接加内连接结果将是不正确的。

请注意,24 小时是“时差”,因此该窗口可以查看过去和未来。如果您希望左侧记录仅与较旧的右侧记录连接,并在未找到匹配项时以正确的方式发出左连接结果,您可以使用

JoinWindows#before(...)
(或
#after(...)
)来定义“不对称”窗户。详情请参阅
JoinWindows
的JavaDocs。

有关更多详细信息,请查看https://www.confluence.io/events/kafka-summit-europe-2021/temporal-joins-in-kafka-streams-and-ksqldb/

© www.soinside.com 2019 - 2024. All rights reserved.