Kafka 消费者未分配给任何分区

问题描述 投票:0回答:2

我正在每半小时运行一次的服务中调用该类。我只想从主题列表中获取最后一条消息。 但随机我没有得到最新的记录。相反,在这种情况下,它会将偏移值显示为 0。此外,新分配的分区将作为空设置新分配的分区[]。

 try {
            consumer.subscribe(Collections.singletonList(topic));

            ConsumerRecords last= consumer.poll(Duration.ofSeconds(2).toMillis());

            consumer.assignment().forEach(System.out::println);
            AtomicLong maxTimestamp = new AtomicLong();
            AtomicReference<ConsumerRecord<String, String>> latestRecord = new AtomicReference<>();

            // get the last offsets for each partition
            consumer.endOffsets(consumer.assignment()).forEach((topicPartition, offset) -> {
                logger.info("offset: "+offset);

                // seek to the last offset of each partition
                consumer.seek(topicPartition, (offset==0) ? offset:offset - 1);
                topicoffset=topicoffset+offset;
                // poll to get the last record in each partition
                consumer.poll(Duration.ofSeconds(10).toMillis()).forEach(record -> {
                    
                    // the latest record in the 'topic' is the one with the highest timestamp
                    if (record.timestamp() > maxTimestamp.get()) {
                        maxTimestamp.set(record.timestamp());
                        latestRecord.set(record);
                    }
                });
            });
            
            ConsumerRecord<String, String> lastrecord =latestRecord.get();

消费者

[Consumer clientId=consumer-1, groupId=get-offset-command-test-1] Discovered coordinator kafka.test.com:9092 (id: 2147483647 rack: null)
 [Consumer clientId=consumer-1, groupId=get-offset-command-test-1] Revoking previously assigned partitions []
 [Consumer clientId=consumer-1, groupId=get-offset-command-test-1] (Re-)joining group
 [Consumer clientId=consumer-1, groupId=get-offset-command-test-1] Successfully joined group with generation 2
[Consumer clientId=consumer-1, groupId=get-offset-command-test-1] Setting newly assigned partitions []
java apache-kafka kafka-consumer-api
2个回答
0
投票

有两个包含上述类的 jar 实例,具有相同的组 id...主题不同,但组 id 相同...仅当两个实例都在运行时我才得到空列表。如果我关闭一个实例,则另一个工作正常。

当您启动“其他”实例订阅不同主题但在同一组中时,整个组将重新平衡。这将取消分配(并最终重新分配)分区给“第一个实例”。

两个实例不太可能同时具有空列表,但有可能一个实例会从组中获取所有分区,但仅限于已写入要订阅的主题。

当跨消费者实例使用相同的组时,最佳实践是始终订阅相同的主题。

由于没有明确的理由使用同一组从任何主题获取“最新”记录,我建议为该组使用随机 UUID,然后在使用者配置中禁用任何自动提交


0
投票

这可能与这个问题不完全相关,但对我来说,清除C:mp\kafka_logs中的日志并重新启动kafka服务器就可以了

© www.soinside.com 2019 - 2024. All rights reserved.