一直尝试使用Vertx在Java中编写kafka使用者。
我需要将自动提交设置为false(特定用例)。
下面是执行显式轮询的代码
consumer.subscribe("test", ar -> {
if (ar.succeeded()) {
System.out.println("Consumer subscribed");
vertx.setPeriodic(1000, timerId -> {
consumer.poll(100, ar1 -> {
if (ar1.succeeded()) {
KafkaConsumerRecords<String, String> records = ar1.result();
for (int i = 0; i < records.size(); i++) {
KafkaConsumerRecord<String, String> record = records.recordAt(i);
System.out.println("key=" + record.key() + ",value=" + record.value() +
",partition=" + record.partition() + ",offset=" + record.offset());
}
}
});
});
}});
并且,要手动提交:
consumer.commit(ar -> {
if (ar.succeeded()) {
System.out.println("Last read message offset committed");
}
});
我的问题是,如果将轮询频率设置为1000ms,并且提交是手动的,那么如果在1000ms内未处理该消息会发生什么?
将在处理第一组消息之前进行下一次轮询吗?如果是,它将再次获取相同的消息集(尚未提交)还是更新的消息集?
查看KafkaConsumer#poll
的文档:
在每次轮询中,消费者将尝试使用上次消耗的偏移量作为起始偏移量,并按顺序获取。可以通过seek(TopicPartition,long)手动设置最后消耗的偏移量,或者自动将其设置为已订阅分区列表的最后提交的偏移量
最后消耗的偏移量是指KafkaConsumer#poll
的内部状态,而不是其提交的状态。这意味着它不会再次获取相同的消息,但是会获取接下来的100条消息。