我正在尝试从一个分区主题安排我的消费过程。我可以使用endpointlistenerregistry.start()
启动它,但是我想在消耗完当前分区中的所有消息后即当到达当前分区中的最后一个偏移量时停止它。我完成消费并关闭后,就可以完成对该主题的制作。在启动调度程序并停止消费者之前,如何确保我已阅读所有消息?我正在为消费者使用@Kafkalistener
。
要读取直到最后一个偏移量,只需轮询直到获得空记录。
您可以在使用结束时调用kafkaConsumer.pause()。在下一个计划期间,需要调用kafkaConsumer.resume()。
暂停从请求的分区中提取。将来对poll(Duration)的调用将不会从这些分区中返回任何记录,除非已使用resume(Collection)恢复了它们。请注意,此方法不会影响分区订阅。特别是,使用自动分配时,它不会导致组重新平衡。
这样的东西,
List<TopicPartition> topicPartitions = new ArrayList<>();
void scheduleProcess() {
topicPartitions = ... // assign partition info for this
kafkaConsumer.resume(topicPartitions)
while(true) {
ConsumerRecords<String, Object> events = kafkaConsumer.poll(Duration.ofMillis(1000));
if(!events.isEmpty()) {
// processing logic
} else {
kafkaConsumer.pause(List.of(topicPartition));
break;
}
}
}