我正在使用spring-kafka来吸收来自两个Kafka主题的消息,它们发送的消息格式如下。
@KafkaListener(topics = {"topic_country1", "topic_country2"}, groupId = KafkaUtils.MESSAGE_GROUP)
public void onCustomerMessage(String message, Acknowledgment ack) throws Exception {
log.info("Message : {} is received", message);
ack.acknowledge();
}
您可以设置concurrency
属性以运行更多线程;但是每个分区只能由一个线程处理。要增加并发性,必须增加每个主题中的分区数。在同一侦听器中侦听多个主题时,如果这些主题只有一个分区,那么除非更改kafka使用者分区分配器,否则您可能无法获得所需的并发性。
当收听多个主题时,默认分区分布可能不是您期望的。例如,如果您有三个主题,每个主题有五个分区,并且要使用concurrency = 15,则只会看到五个活动的使用者,每个使用者都为每个主题分配了一个分区,而其他10个使用者处于空闲状态。这是因为默认的Kafka PartitionAssignor是RangeAssignor(请参阅其Javadoc)。对于这种情况,您可能需要考虑使用RoundRobinAssignor,它会在所有使用者中分配分区。然后,为每个使用者分配一个主题或分区。 ...