spring-kafka KafkaListener中的并行处理和自动缩放

问题描述 投票:0回答:1

我正在使用spring-kafka来吸收来自两个Kafka主题的消息,它们发送的消息格式如下。

    @KafkaListener(topics = {"topic_country1", "topic_country2"}, groupId = KafkaUtils.MESSAGE_GROUP)
    public void onCustomerMessage(String message, Acknowledgment ack) throws Exception {
        log.info("Message : {}  is received", message);
        ack.acknowledge();
    }
  • KafkaListener可以根据它在两个主题中自己和并行处理的消息所侦听的主题数目来分配使用者线程的数目吗?还是它不支持并行处理,并且消息必须在主题中等待,直到处理完一条消息为止?
  • 如果主题中的消息数量更多,我需要自动缩放微服务以启动新实例(直到分区数量)。从KafkaListener的角度来看,我可以依靠哪些参数(CPU,内存)来找出主题中的消息数量更高? (即在API中,我可以通过监视HTTP延迟来自动扩展服务)
spring-boot apache-kafka kafka-consumer-api spring-kafka autoscaling
1个回答
0
投票

您可以设置concurrency属性以运行更多线程;但是每个分区只能由一个线程处理。要增加并发性,必须增加每个主题中的分区数。在同一侦听器中侦听多个主题时,如果这些主题只有一个分区,那么除非更改kafka使用者分区分配器,否则您可能无法获得所需的并发性。

请参见https://docs.spring.io/spring-kafka/docs/2.5.0.RELEASE/reference/html/#using-ConcurrentMessageListenerContainer

当收听多个主题时,默认分区分布可能不是您期望的。例如,如果您有三个主题,每个主题有五个分区,并且要使用concurrency = 15,则只会看到五个活动的使用者,每个使用者都为每个主题分配了一个分区,而其他10个使用者处于空闲状态。这是因为默认的Kafka PartitionAssignor是RangeAssignor(请参阅其Javadoc)。对于这种情况,您可能需要考虑使用RoundRobinAssignor,它会在所有使用者中分配分区。然后,为每个使用者分配一个主题或分区。 ...

© www.soinside.com 2019 - 2024. All rights reserved.