我使用 kafka 流和 spring-cloud-stream-binder-kafka-streams 来构建一个流,该流从主题接收有关 kubernetes 集群中运行的 pod 的信息,并用它构建容器映像与关联 pod 的映射。
public class ContainerPodMap {
private String image;
private List<String> pods;
}
基于此,我尝试加入包含扫描操作的 kstream。此连接会正确触发一次,但是一旦我通过生成到正在运行的 Pod 主题中添加新条目,LeftJoin 操作就永远不会再次触发,即使我希望具有新容器 Pod-Map 的新 Pod 事件应该创建一个 LeftJoin,其中scanAction 为空。
我实现了一些查看来看看发生了什么,看起来新条目只是在左连接前面“消失”了。所以 peek 容器映射: - 按预期工作。
@Bean
public BiFunction<KStream<String, PodEventDto>, KStream<String, ScanAction>, KStream<String, ScanAction>> events() {
return (podEventStream, scanActionKStream) -> {
return podEventStream
.process(PodEventStreamProcessor::new, "container-pod-map")
.peek((key, value) -> {
System.out.println("container-map: " + key + " - " + value.getImage());
})
.map((key, value) -> KeyValue.pair(key, value))
.leftJoin(scanActionKStream,
(key, containerPodMap, scanAction) -> {
/// never arrive here after the initial startup
},
JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofMinutes(SCAN_INTERVAL_IN_MINUTES)),
StreamJoined.with(Serdes.String(), new JsonSerde<>(ContainerPodMap.class), new JsonSerde<>(ScanAction.class)))
///...
}
public static class PodEventStreamProcessor extends ContextualProcessor<String, PodEventDto, String, ContainerPodMap> {
private KeyValueStore<String, ContainerPodMap> stateStore;
@Override
public void init(ProcessorContext<String, ContainerPodMap> context) {
super.init(context);
stateStore = context.getStateStore("container-pod-map");
}
@Override
public void process(Record<String, PodEventDto> podEventDtoRecord) {
String podId = podEventDtoRecord.value().getId();
for (PodContainerEventDto podContainer : podEventDtoRecord.value().getContainer()) {
// simplyfied to reduce complexity in example
context().forward(stringContainerPodMapRecord);
}
spring.cloud.stream.bindings.events-in-0.destination=pod-event
spring.cloud.stream.bindings.events-in-1.destination=scan-tasks
spring.cloud.stream.bindings.events-out-0.destination=scan-tasks
spring.cloud.stream.kafka.streams.binder.configuration.default.value.serde=org.springframework.kafka.support.serializer.JsonSerde
spring.cloud.stream.kafka.streams.binder.configuration.default.key.serde=org.apache.kafka.common.serialization.Serdes$StringSerde
左连接结果仅在连接窗口关闭后发出。只要连接窗口打开,就不清楚左输入记录是否会产生内连接结果。仅当左输入记录不产生内连接结果时,才会将其作为左连接结果发出。
如果仅发送单个记录,则流时间(仅根据记录时间戳提前)不会提前,因此联接窗口不会关闭。仅当处理具有足够大时间戳的另一条记录时,使得连接窗口关闭,才会发出左连接结果。