KStream-当第二个匹配值不存在时,KStream leftjoin 不会产生任何值

问题描述 投票:0回答:1

我们有这些 POJO:

@Data
@NoArgsConstructure
@AllArgsConstructure
class MyPost {
    private String content;
    private SeenInfo seenInfo;
}


@Data
@NoArgsConstructure
@AllArgsConstructure
class SeenInfo {
    private Integer seenCount;
    //other fields...
}

以及我们应用程序中的左连接过程:

@Bean
public Function<KStream<String, MyPost>, Function<KStream<String, SeenInfo>, KStream<String, MyPost>>> joinProcess(Map<String, String> schemaConfig) {
    return postStream ->
            seenInfoStream -> {
                SpecificAvroSerde<MyPost> postSerde = new SpecificAvroSerde<>();
                SpecificAvroSerde<SeenInfo> seenInfoSerde = new SpecificAvroSerde<>();
                postSerde.configure(schemaConfig, true);
                seenInfoSerde.configure(schemaConfig, true);
                return postStream.leftJoin(seenInfoStream,
                        (p, s) -> {
                            p.setSeenInfo(s);
                            return p;
                        },
                        JoinWindows.of(Duration.ofMinutes(5)),
                        StreamJoined.with(Serdes.String(),
                                postSerde,
                                seenInfoSerde));
            };
}

问题一:

MyPost 和 SeenInfo 匹配值在 5 分钟内出现时,加入过程会生成两条消息:

Message1: MyPost={ "content": "some text", "seenInfo": null}

Message2: MyPost={ "content": "some text", "seenInfo": { "seenCount": 1, ...}}

问题二:

如果 MyPost 存在而 SeenInfo 不存在,则加入过程将不会返回任何数据。
我们期望:

Message: MyPost={ "content": "some text", "seenInfo": null}

我们应该怎么做才能解决这个问题?

java apache-kafka left-join apache-kafka-streams spring-cloud-stream-binder-kafka
1个回答
0
投票

似乎您正在使用旧版本的 Kafka Streams,它面临着关于“虚假左连接结果”的第一个问题。它在 Kafka 3.1 版本中已修复(参见 https://issues.apache.org/jira/browse/KAFKA-10847)。

比较 https://cwiki.apache.org/confluence/display/KAFKA/KIP-633%3A+Deprecate+24-hour+Default+Grace+Period+for+Windowed+Operations+in+Streams

对于第二个问题,是的,您应该得到一个输出 - 如果您使用版本 3.1,丢失输出的唯一解释是您的输入流“停止/暂停”并且没有新数据到达 - 如果没有连接窗口不会关闭的新数据左连接结果会“卡住”,直到时间推进(基于记录时间戳,因此需要新的输入数据)。

© www.soinside.com 2019 - 2024. All rights reserved.