Vertx Kafka通过明确的轮询自动提交

问题描述 投票:1回答:1

一直尝试使用Vertx在Java中编写kafka使用者。

我需要将自动提交设置为false(特定用例)。

下面是执行显式轮询的代码

consumer.subscribe("test", ar -> {

if (ar.succeeded()) {
System.out.println("Consumer subscribed");

vertx.setPeriodic(1000, timerId -> {

  consumer.poll(100, ar1 -> {

    if (ar1.succeeded()) {

      KafkaConsumerRecords<String, String> records = ar1.result();
      for (int i = 0; i < records.size(); i++) {
        KafkaConsumerRecord<String, String> record = records.recordAt(i);
        System.out.println("key=" + record.key() + ",value=" + record.value() +
          ",partition=" + record.partition() + ",offset=" + record.offset());
      }
    }
  });

});

}});

并且,要手动提交:

consumer.commit(ar -> {

 if (ar.succeeded()) {
 System.out.println("Last read message offset committed");
 }
 });

我的问题是,如果将轮询频率设置为1000ms,并且提交是手动的,那么如果在1000ms内未处理该消息会发生什么?

将在处理第一组消息之前进行下一次轮询吗?如果是,它将再次获取相同的消息集(尚未提交)还是更新的消息集?

java apache-kafka rx-java2 kafka-consumer-api vert.x
1个回答
0
投票

查看KafkaConsumer#poll的文档:

在每次轮询中,消费者将尝试使用上次消耗的偏移量作为起始偏移量,并按顺序获取。可以通过seek(TopicPartition,long)手动设置最后消耗的偏移量,或者自动将其设置为已订阅分区列表的最后提交的偏移量

最后消耗的偏移量是指KafkaConsumer#poll的内部状态,而不是其提交的状态。这意味着它不会再次获取相同的消息,但是会获取接下来的100条消息。

© www.soinside.com 2019 - 2024. All rights reserved.