我有两个kafka监听器,如下图所示,同一个应用程序在两个实例上运行,如inc1和inc2。
@KafkaListener(topics = "foo1, foo2", groupId = foo.id, id = "foo")
public void fooTopics(@Header(KafkaHeaders.RECEIVED_TOPIC) String topic, String message, Acknowledgment acknowledgment) {
//processing
}
@KafkaListener(topics = "Bar1, Bar2", groupId = bar.id, id = "bar")
public void barTopics(@Header(KafkaHeaders.RECEIVED_TOPIC) String topic, String message, Acknowledgment acknowledgment) {
//processing
同一个应用程序运行在两个实例上,如inc1和inc2。有什么办法,如果我可以把foo监听器分配给inc1,把bar监听器分配给inc2,如果一个实例倒下了,两个监听器(foo和bar)都分配给正在运行的实例。
你可以使用@KafkaListener属性。自动启动从2.2开始引入。当一个实例死亡时,你可以在另一个实例中自动启动它,就像这样。
@Autowired
private KafkaListenerEndpointRegistry registry;
...
@KafkaListener(topics = "foo1, foo2", groupId = foo.id, id = "foo", autoStartup = "false")
public void fooTopics(@Header(KafkaHeaders.RECEIVED_TOPIC) String topic, String message, Acknowledgment acknowledgment) {
//processing
}
//Start up condition
registry.getListenerContainer("foo").start();