我目前正在使用 spring kafka 实现 kafka 监听器。 现在我有点困惑,因为我的侦听器有时无法分配给分区,因此无法从其中一个代理接收任何消息。
我有两个broker,每个broker都有一个主题,其中只包含一个分区。(主题名称相同)
经纪人1 | 经纪人2 |
---|---|
主题1 | 主题1 |
一个分区 | 一个分区 |
也许我对设置我的卡夫卡监听器从两个经纪人那里获取消息有错误的想法。不过,下面这句话猜对了吗?
Passing two brokers' host to listener
和Setting concurrency to 2
配置是否足够,因为我有两个分区(一个在broker1中,另一个在broker2中)?
创建带有注释的监听器
@KafkaListener(topics = "$Topic1" , containerFactory = "kafkaListenerContainerFactory")
将其并发设置为
2
factory.setConcurrency(2);
提供一系列经纪人
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, [123.123.123:9092, 211.211.211:9094]);
设置群组
props.put(ConsumerConfig.GROUP_ID_CONFIG, "testgroup");
以循环方式设置分区分配策略
props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, RoundRobinAssignor.class.getName());
能够从两个代理接收消息,但有时无法从两个代理之一接收任何消息
如果代理位于同一集群中,则应该可以正常工作。
如果代理不属于同一集群,则必须添加两个
@KafkaListener
注释,每个注释都使用指向每个集群的容器工厂。或者,您可以在注释之一上使用 properties
属性来覆盖 ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG
属性。