我正在每半小时运行一次的服务中调用该类。我只想从主题列表中获取最后一条消息。 但随机我没有得到最新的记录。相反,在这种情况下,它会将偏移值显示为 0。此外,新分配的分区将作为空设置新分配的分区[]。
try {
consumer.subscribe(Collections.singletonList(topic));
ConsumerRecords last= consumer.poll(Duration.ofSeconds(2).toMillis());
consumer.assignment().forEach(System.out::println);
AtomicLong maxTimestamp = new AtomicLong();
AtomicReference<ConsumerRecord<String, String>> latestRecord = new AtomicReference<>();
// get the last offsets for each partition
consumer.endOffsets(consumer.assignment()).forEach((topicPartition, offset) -> {
logger.info("offset: "+offset);
// seek to the last offset of each partition
consumer.seek(topicPartition, (offset==0) ? offset:offset - 1);
topicoffset=topicoffset+offset;
// poll to get the last record in each partition
consumer.poll(Duration.ofSeconds(10).toMillis()).forEach(record -> {
// the latest record in the 'topic' is the one with the highest timestamp
if (record.timestamp() > maxTimestamp.get()) {
maxTimestamp.set(record.timestamp());
latestRecord.set(record);
}
});
});
ConsumerRecord<String, String> lastrecord =latestRecord.get();
消费者
[Consumer clientId=consumer-1, groupId=get-offset-command-test-1] Discovered coordinator kafka.test.com:9092 (id: 2147483647 rack: null)
[Consumer clientId=consumer-1, groupId=get-offset-command-test-1] Revoking previously assigned partitions []
[Consumer clientId=consumer-1, groupId=get-offset-command-test-1] (Re-)joining group
[Consumer clientId=consumer-1, groupId=get-offset-command-test-1] Successfully joined group with generation 2
[Consumer clientId=consumer-1, groupId=get-offset-command-test-1] Setting newly assigned partitions []
有两个包含上述类的 jar 实例,具有相同的组 id...主题不同,但组 id 相同...仅当两个实例都在运行时我才得到空列表。如果我关闭一个实例,则另一个工作正常。
当您启动“其他”实例订阅不同主题但在同一组中时,整个组将重新平衡。这将取消分配(并最终重新分配)分区给“第一个实例”。
两个实例不太可能同时具有空列表,但有可能一个实例会从组中获取所有分区,但仅限于已写入要订阅的主题。
当跨消费者实例使用相同的组时,最佳实践是始终订阅相同的主题。
由于没有明确的理由使用同一组从任何主题获取“最新”记录,我建议为该组使用随机 UUID,然后在使用者配置中禁用任何自动提交
这可能与这个问题不完全相关,但对我来说,清除C:mp\kafka_logs中的日志并重新启动kafka服务器就可以了