如何在暂停模式下部署kafka使用者,直到我发出开始消耗消息的信号

问题描述 投票:0回答:1

我正在使用spring-kafka 2.2.8,并试图了解是否存在将kafka使用者部署为暂停模式的选项,直到我发信号开始使用这些消息为止。请提出建议。

我在下面的文章中看到,我们可以暂停并启动使用者,但是我需要使用者在部署时处于暂停模式。how to pause and resume @KafkaListener using spring-kafka

spring-kafka
1个回答
1
投票

@KafkaListener(id = "foo", ..., autoStartup = "false")

然后在准备好时使用KafkaListenerEndpointRegistry启动它

registry.getListenerContainer("foo").start();

以暂停模式启动它没有多大意义,但是您可以这样做...

@SpringBootApplication
public class So62329274Application {

    public static void main(String[] args) {
        SpringApplication.run(So62329274Application.class, args);
    }


    @KafkaListener(id = "so62329274", topics = "so62329274", autoStartup = "false")
    public void listen(String in) {
        System.out.println(in);
    }

    @Bean
    public NewTopic topic() {
        return TopicBuilder.name("so62329274").partitions(1).replicas(1).build();
    }


    @Bean
    public ApplicationRunner runner(KafkaListenerEndpointRegistry registry, KafkaTemplate<String, String> template) {
        return args -> {
            template.send("so62329274", "foo");
            registry.getListenerContainer("so62329274").pause();
            registry.getListenerContainer("so62329274").start();
            System.in.read();
            registry.getListenerContainer("so62329274").resume();
        };
    }

}

分配分区后,您将看到类似这样的日志消息:

由于重新平衡,卡夫卡恢复了暂停的消费者;使用者再次暂停,因此初始poll()将永远不会返回任何记录

© www.soinside.com 2019 - 2024. All rights reserved.