我正在使用spring-kafka 2.2.8,并试图了解是否存在将kafka使用者部署为暂停模式的选项,直到我发信号开始使用这些消息为止。请提出建议。
我在下面的文章中看到,我们可以暂停并启动使用者,但是我需要使用者在部署时处于暂停模式。how to pause and resume @KafkaListener using spring-kafka
@KafkaListener(id = "foo", ..., autoStartup = "false")
然后在准备好时使用KafkaListenerEndpointRegistry
启动它
registry.getListenerContainer("foo").start();
以暂停模式启动它没有多大意义,但是您可以这样做...
@SpringBootApplication
public class So62329274Application {
public static void main(String[] args) {
SpringApplication.run(So62329274Application.class, args);
}
@KafkaListener(id = "so62329274", topics = "so62329274", autoStartup = "false")
public void listen(String in) {
System.out.println(in);
}
@Bean
public NewTopic topic() {
return TopicBuilder.name("so62329274").partitions(1).replicas(1).build();
}
@Bean
public ApplicationRunner runner(KafkaListenerEndpointRegistry registry, KafkaTemplate<String, String> template) {
return args -> {
template.send("so62329274", "foo");
registry.getListenerContainer("so62329274").pause();
registry.getListenerContainer("so62329274").start();
System.in.read();
registry.getListenerContainer("so62329274").resume();
};
}
}
分配分区后,您将看到类似这样的日志消息:
由于重新平衡,卡夫卡恢复了暂停的消费者;使用者再次暂停,因此初始poll()将永远不会返回任何记录