我有两个使用者组first
和second
使用Spring Boot版本[[2.1.8订阅了my-topic
主题。我使用每个单独的消费者组提供2种服务。
@KafkaListener(topics = "my-topic")
public void consume(@Payload String message) {
logger.info(String.format("#### -> Consumed message -> %s", message));
}
使用kafka-consumer-groups
的列表显示以下内容:
root@8d49c1b2c3bf:/# kafka-consumer-groups --bootstrap-server localhost:9092 --describe --all-groups GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID first my-topic 2 0 0 0 consumer-2-42b222ac-998a-4629-8358-1846c85f37e7 /172.18.0.9 consumer-2 first my-topic 0 0 0 0 consumer-2-42b222ac-998a-4629-8358-1846c85f37e7 /172.18.0.9 consumer-2 first my-topic 1 0 0 0 consumer-2-42b222ac-998a-4629-8358-1846c85f37e7 /172.18.0.9 consumer-2 GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID second my-topic 0 0 0 0 consumer-2-294ef783-2cd8-450b-8ac0-9e8a2f287df8 /172.18.0.8 consumer-2 second my-topic 1 0 0 0 consumer-2-294ef783-2cd8-450b-8ac0-9e8a2f287df8 /172.18.0.8 consumer-2 second my-topic 2 0 0 0 consumer-2-294ef783-2cd8-450b-8ac0-9e8a2f287df8 /172.18.0.8 consumer-2
我希望first
组中的使用者仅使用0和2个分区,所以我尝试通过侦听器实现它:
@KafkaListener(topicPartitions = { @TopicPartition( topic = "my-topic", partitions = { "0", "2" } )} ) public void consume(@Payload String message) { logger.info(String.format("#### -> Consumed message -> %s", message)); }
为什么使用相同命令的清单除了消息(first
组的分区1以外,没有显示其他内容?CONSUMER-ID
值也为空,并且@KafkaListener
对于first
组完全接收no
second
组中的侦听器工作相同)。如何解决?Consumer group 'first' has no active members.
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
first my-topic 2 0 0 0 - - -
first my-topic 0 0 0 0 - - -
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
second my-topic 0 0 0 0 consumer-2-294ef783-2cd8-450b-8ac0-9e8a2f287df8 /172.18.0.8 consumer-2
second my-topic 1 0 0 0 consumer-2-294ef783-2cd8-450b-8ac0-9e8a2f287df8 /172.18.0.8 consumer-2
second my-topic 2 0 0 0 consumer-2-294ef783-2cd8-450b-8ac0-9e8a2f287df8 /172.18.0.8 consumer-2
assign()
方法,而不使用组协调。每个使用者即使与另一个使用者共享groupId也可以独立执行操作]假设您要始终从所有分区读取所有记录(例如,当使用压缩主题加载分布式缓存时,手动分配分区而不使用Kafka的组管理会很有用。但是检查消费者位置的命令将仅使用组协调显示消费者的位置
有时,查看消费者的位置很有用。我们有一个工具可以显示所有使用者在使用者组中的位置,以及他们在日志末尾之后的位置
。要在名为my-group的使用者组上运行此工具,该使用者组将使用名为my-topic的主题KafkaConsumer
手动分区分配不使用组协调,因此使用者故障不会导致重新分配分配的分区。即使每个使用者与另一个使用者共享一个groupId,每个使用者也可以独立行动。为避免偏移提交冲突,通常应确保groupId对于每个使用者实例都是唯一的。
并且监听器不使用消息的最终原因是由于偏移量,默认情况下,偏移量设置为最新,因此您需要指定偏移量位置,尤其是这样
@KafkaListener(id = "thing2", topicPartitions =
{ @TopicPartition(topic = "topic1", partitions = { "0", "1" }),
@TopicPartition(topic = "topic2", partitions = "0",
partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "100"))
})
public void listen(ConsumerRecord<?, ?> record) {
...
}
有关offset的更多信息
第一个构造函数采用一个TopicPartitionOffset参数数组,以明确指示容器有关使用哪个分区的方法(通过使用Consumer Assign()方法),并带有可选的初始偏移量。默认情况下,正值是绝对偏移量。默认情况下,负值相对于分区中当前的最后一个偏移量。提供了一个TopicPartitionOffset的构造函数,该构造函数带有一个附加的布尔参数。如果为真,则初始偏移量(正值或负值)相对于此使用者的当前位置。在容器启动时应用偏移。您可以在partitions或partitionOffsets属性中指定每个分区,但不能同时指定两者。注意: