在另一个 KafkaListner(压缩主题 2)完成读取消息后启动 KafkaListner(主题 1)

问题描述 投票:0回答:2

我有一个场景,我必须从头开始阅读压缩主题(主题 2)中的所有消息。我必须将所有这些消息保存在内存中,这将充当查找/缓存。

我有另一个主题(主题 1),一旦消息到达,我必须从我们上面创建的缓存中进行一些查找并进一步处理。

如何确保在启动过程中,主题 1 的 KafkaListener 不会启动,直到主题 2 的 KafkaListener 读取缓存中加载的所有消息?

spring-kafka
2个回答
1
投票

2.7.3有新功能。

https://docs.spring.io/spring-kafka/docs/current/reference/html/#sequencing

一个常见的用例是在另一个侦听器消耗完主题中的所有记录后启动侦听器。例如,您可能希望在处理其他主题的记录之前将一个或多个压缩主题的内容加载到内存中。从版本 2.7.3 开始,引入了新组件

ContainerGroupSequencer 
。它使用 @KafkaListener containerGroup 属性将容器分组在一起,并在当前组中的所有容器都空闲时启动下一组中的容器。

最好用一个例子来说明。

@KafkaListener(id = "listen1", topics = "topic1", containerGroup = "g1", concurrency = "2")
public void listen1(String in) {
}

@KafkaListener(id = "listen2", topics = "topic2", containerGroup = "g1", concurrency = "2")
public void listen2(String in) {
}

@KafkaListener(id = "listen3", topics = "topic3", containerGroup = "g2", concurrency = "2")
public void listen3(String in) {
}

@KafkaListener(id = "listen4", topics = "topic4", containerGroup = "g2", concurrency = "2")
public void listen4(String in) {
}

@Bean
ContainerGroupSequencer sequencer(KafkaListenerEndpointRegistry registry) {
    return new ContainerGroupSequencer(registry, 5000, "g1", "g2");
}

这里,我们有 4 个听众,分为两组,g1 和 g2。

在应用程序上下文初始化期间,排序器将所提供组中所有容器的

autoStartup
属性设置为 false。它还将任何容器(还没有一组)的
idleEventInterval
设置为提供的值(在本例中为 5000ms)。然后,当应用程序上下文启动定序器时,将启动第一组中的容器。当收到
ListenerContainerIdleEvent
时,每个容器中的每个单独的子容器都会停止。当
ConcurrentMessageListenerContainer
中的所有子容器停止时,父容器也停止。当一组中的所有容器都停止后,下一组中的容器将启动。组或组中容器的数量没有限制。

默认情况下,最后一组(上面的g2)中的容器在空闲时不会停止。要修改该行为,请在定序器上将 stopLastGroupWhenIdle 设置为 true。

对于早期版本,您必须自己实现排序;请参阅这个答案


0
投票

我有一个场景,首先使用主题 1 中的消息,然后使用主题 1 中的所有消息,然后使用主题 2 消息。正如您在上面的评论中提到的,我使用了 ContainerGroupSequencer。

正如预期的那样,第 1 组在收到所有消息后停止并移至第 2 组。但就我而言,我不想在阅读事件后停止第 1 组。只是想先读取主题 1 的消息,然后再读取主题 2 的消息。

© www.soinside.com 2019 - 2024. All rights reserved.