我有一个 SpringBoot 应用程序,其中使用带有主题和 groupId 的 @KafkaListener 注释。我的听众需要听两个主题。当听一个主题时,任何一个都可以正常工作
@KafkaListener(topics = "topic1", groupId = "local")
或
@KafkaListener(topics = "topic2", groupId = "local")
但是,当我像这样将两者结合起来时(这是我对如何在一个听众上执行多个主题的理解):
@KafkaListener(topics = {"topic1,topic2"}, groupId = "local")
,我遇到如下滚动异常:
{
"app": "myApp",
"@timestamp": "2022-12-20T12:01:55.004Z",
"userId": "",
"hostname": "",
"ipAddress": "",
"logger": "org.apache.kafka.clients.Metadata",
"level": "ERROR",
"thread": "org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1",
"message": "[Consumer clientId=consumer-local-1, groupId=local] Metadata response reported invalid topics [topic1,topic2]",
"class": "org.apache.kafka.clients.Metadata",
"method": "checkInvalidTopics",
"file": "Metadata.java",
"line": 294
}
{
"app": "myApp",
"@timestamp": "2022-12-20T12:01:55.004Z",
"userId": "",
"hostname": "",
"ipAddress": "",
"logger": "org.apache.kafka.clients.consumer.internals.AbstractCoordinator",
"level": "INFO",
"thread": "org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1",
"message": "[Consumer clientId=consumer-local-1, groupId=local] Rebalance failed.",
"class": "org.apache.kafka.clients.consumer.internals.AbstractCoordinator",
"method": "joinGroupIfNeeded",
"file": "AbstractCoordinator.java",
"line": 470,
"stack": "org.apache.kafka.common.errors.InvalidTopicException: Invalid topics: [topic1,topic2]\n"
}
任何关于我做错了什么的想法都非常感谢。谢谢你
您可以通过将多个主题传递到逗号分隔的对象中来监听同一个 @KafkaListener 中的多个主题。在你的情况下应该是
@KafkaListener(topics = {"topic1", "topic2"}, groupId = "local")
试试这个:
{"topic1", "topic2"}