KStream-KStream Join 不触发左连接

问题描述 投票:0回答:1

我使用 kafka 流和 spring-cloud-stream-binder-kafka-streams 来构建一个流,该流从主题接收有关 kubernetes 集群中运行的 pod 的信息,并用它构建容器映像与关联 pod 的映射。

public class ContainerPodMap {

    private String image;
    private List<String> pods;

}

基于此,我尝试加入包含扫描操作的 kstream。此连接会正确触发一次,但是一旦我通过生成到正在运行的 Pod 主题中添加新条目,LeftJoin 操作就永远不会再次触发,即使我希望具有新容器 Pod-Map 的新 Pod 事件应该创建一个 LeftJoin,其中scanAction 为空。

我实现了一些查看来看看发生了什么,看起来新条目只是在左连接前面“消失”了。所以 peek 容器映射: - 按预期工作。

@Bean
    public BiFunction<KStream<String, PodEventDto>, KStream<String, ScanAction>, KStream<String, ScanAction>> events() {
        return (podEventStream, scanActionKStream) -> {
            return podEventStream
                    .process(PodEventStreamProcessor::new, "container-pod-map")
                    .peek((key, value) -> {
                        System.out.println("container-map: " + key + " - " + value.getImage());
                    })
                    .map((key, value) -> KeyValue.pair(key, value))
                    .leftJoin(scanActionKStream,
                             (key, containerPodMap, scanAction) -> {
                                /// never arrive here after the initial startup
                             },
                              JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofMinutes(SCAN_INTERVAL_IN_MINUTES)),
                             StreamJoined.with(Serdes.String(), new JsonSerde<>(ContainerPodMap.class), new JsonSerde<>(ScanAction.class)))
    ///...
    }

public static class PodEventStreamProcessor extends ContextualProcessor<String, PodEventDto, String, ContainerPodMap> {

        private KeyValueStore<String, ContainerPodMap> stateStore;

        @Override
        public void init(ProcessorContext<String, ContainerPodMap> context) {
            super.init(context);
            stateStore = context.getStateStore("container-pod-map");
        }
    
        @Override
        public void process(Record<String, PodEventDto> podEventDtoRecord) {
            String podId = podEventDtoRecord.value().getId();
            for (PodContainerEventDto podContainer : podEventDtoRecord.value().getContainer()) {
                // simplyfied to reduce complexity in example
                context().forward(stringContainerPodMapRecord);
            }

                     

spring.cloud.stream.bindings.events-in-0.destination=pod-event
spring.cloud.stream.bindings.events-in-1.destination=scan-tasks
spring.cloud.stream.bindings.events-out-0.destination=scan-tasks
spring.cloud.stream.kafka.streams.binder.configuration.default.value.serde=org.springframework.kafka.support.serializer.JsonSerde
spring.cloud.stream.kafka.streams.binder.configuration.default.key.serde=org.apache.kafka.common.serialization.Serdes$StringSerde
spring-kafka apache-kafka-streams
1个回答
0
投票

左连接结果仅在连接窗口关闭后发出。只要连接窗口打开,就不清楚左输入记录是否会产生内连接结果。仅当左输入记录不产生内连接结果时,才会将其作为左连接结果发出。

如果仅发送单个记录,则流时间(仅根据记录时间戳提前)不会提前,因此联接窗口不会关闭。仅当处理具有足够大时间戳的另一条记录时,使得连接窗口关闭,才会发出左连接结果。

参考https://www.confluence.io/events/kafka-summit-europe-2021/temporal-joins-in-kafka-streams-and-ksqldb/

© www.soinside.com 2019 - 2024. All rights reserved.