Spring Kafka-选择@TopicPartition会停用组中的消耗者

问题描述 投票:2回答:1

我有两个使用者组firstsecond使用Spring Boot版本[[2.1.8订阅了my-topic主题。我使用每个单独的消费者组提供2种服务。

使用minimal setup我定义了一个侦听器:

@KafkaListener(topics = "my-topic") public void consume(@Payload String message) { logger.info(String.format("#### -> Consumed message -> %s", message)); }

使用kafka-consumer-groups的列表显示以下内容:

root@8d49c1b2c3bf:/# kafka-consumer-groups --bootstrap-server localhost:9092 --describe --all-groups GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID first my-topic 2 0 0 0 consumer-2-42b222ac-998a-4629-8358-1846c85f37e7 /172.18.0.9 consumer-2 first my-topic 0 0 0 0 consumer-2-42b222ac-998a-4629-8358-1846c85f37e7 /172.18.0.9 consumer-2 first my-topic 1 0 0 0 consumer-2-42b222ac-998a-4629-8358-1846c85f37e7 /172.18.0.9 consumer-2 GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID second my-topic 0 0 0 0 consumer-2-294ef783-2cd8-450b-8ac0-9e8a2f287df8 /172.18.0.8 consumer-2 second my-topic 1 0 0 0 consumer-2-294ef783-2cd8-450b-8ac0-9e8a2f287df8 /172.18.0.8 consumer-2 second my-topic 2 0 0 0 consumer-2-294ef783-2cd8-450b-8ac0-9e8a2f287df8 /172.18.0.8 consumer-2

我希望first组中的使用者仅使用0和2个分区,所以我尝试通过侦听器实现它:

@KafkaListener(topicPartitions = { @TopicPartition( topic = "my-topic", partitions = { "0", "2" } )} ) public void consume(@Payload String message) { logger.info(String.format("#### -> Consumed message -> %s", message)); }

为什么使用相同命令的清单除了first组的分区1以外,没有显示其他内容? CONSUMER-ID值也为空,并且@KafkaListener对于first组完全接收

no

消息(second组中的侦听器工作相同)。如何解决?Consumer group 'first' has no active members. GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID first my-topic 2 0 0 0 - - - first my-topic 0 0 0 0 - - - GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID second my-topic 0 0 0 0 consumer-2-294ef783-2cd8-450b-8ac0-9e8a2f287df8 /172.18.0.8 consumer-2 second my-topic 1 0 0 0 consumer-2-294ef783-2cd8-450b-8ac0-9e8a2f287df8 /172.18.0.8 consumer-2 second my-topic 2 0 0 0 consumer-2-294ef783-2cd8-450b-8ac0-9e8a2f287df8 /172.18.0.8 consumer-2
java spring spring-boot apache-kafka publish-subscribe
1个回答
1
投票
Manually Assigning All Partitions由于您正在将使用者线程分配给特定分区,因此Kafka将使用assign()方法,而不使用组协调。每个使用者即使与另一个使用者共享groupId也可以独立执行操作]

假设您要始终从所有分区读取所有记录(例如,当使用压缩主题加载分布式缓存时,手动分配分区而不使用Kafka的组管理会很有用。

但是检查消费者位置的命令将仅使用组协调显示消费者的位置

Checking consumer position

有时,查看消费者的位置很有用。

我们有一个工具可以显示所有使用者在使用者组中的位置,以及他们在日志末尾之后的位置

。要在名为my-group的使用者组上运行此工具,该使用者组将使用名为my-topic的主题KafkaConsumer

手动分区分配不使用组协调,因此使用者故障不会导致重新分配分配的分区。即使每个使用者与另一个使用者共享一个groupId,每个使用者也可以独立行动。为避免偏移提交冲突,通常应确保groupId对于每个使用者实例都是唯一的。

并且监听器不使用消息的最终原因是由于偏移量,默认情况下,偏移量设置为最新,因此您需要指定偏移量位置,尤其是这样

@KafkaListener(id = "thing2", topicPartitions = { @TopicPartition(topic = "topic1", partitions = { "0", "1" }), @TopicPartition(topic = "topic2", partitions = "0", partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "100")) }) public void listen(ConsumerRecord<?, ?> record) { ... }

有关offset的更多信息

第一个构造函数采用一个TopicPartitionOffset参数数组,以明确指示容器有关使用哪个分区的方法(通过使用Consumer Assign()方法),并带有可选的初始偏移量。默认情况下,正值是绝对偏移量。默认情况下,负值相对于分区中当前的最后一个偏移量。提供了一个TopicPartitionOffset的构造函数,该构造函数带有一个附加的布尔参数。如果为真,则初始偏移量(正值或负值)相对于此使用者的当前位置。在容器启动时应用偏移。

注意:

您可以在partitions或partitionOffsets属性中指定每个分区,但不能同时指定两者。
© www.soinside.com 2019 - 2024. All rights reserved.