卡夫卡消费者。 commitSync VS commitAsync

问题描述 投票:2回答:2

https://www.safaribooksonline.com/library/view/kafka-the-definitive/9781491936153/ch04.html#callout_kafka_consumers__reading_data_from_kafka_CO2-1报价

其缺点是,虽然commitSync()将重试所述提交,直到它成功或遇到非retriable失败,commitAsync()将不会重试。

这句话是我也不清楚。我想,消费者发送提交请求斡旋,并在情况下,如果经纪人没有一些超时则意味着提交失败内做出回应。我错了吗?

你能澄清commitSync的细节差异,commitAsync? 另外,请提供时提交类型,我应该更喜欢用例。

java apache-kafka offset kafka-consumer-api
2个回答
11
投票

由于它的API文档中说:


这是一个同步提交和将阻塞,直到要么提交成功或遇到不可恢复的错误(在这种情况下,它被抛出给调用者)。

这就是说,通过commitSync是一个阻塞方法。调用它会阻止你的线程,直到它成功或失败。

例如,

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(100);
    for (ConsumerRecord<String, String> record : records) {
        System.out.printf("offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value());
        consumer.commitSync();
    }
}

对于在for循环,只有经过consumer.commitSync()成功返回或者抛出的异常中断,你的代码将移动到下一个迭代每个迭代。


这是一个异步调用,并不会阻止。遇到的任何错误都要么传递给回调(如果有的话)或丢弃。

这就是说,通过commitAsync是非阻塞方法。调用它不会阻止你的线程。相反,它会继续处理下面的说明,不管它是否会成功还是失败,最终。

例如,类似于前面的例子,但在这里我们使用commitAsync

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(100);
    for (ConsumerRecord<String, String> record : records) {
        System.out.printf("offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value());
        consumer.commitAsync(callback);
    }
}

对于在for循环,不管会发生什么,最终consumer.commitAsync()每次迭代中,您的代码将移动到下一个迭代。而且,上提交的结果将是由您定义的回调函数来处理。


权衡:延迟与数据一致性

  • 如果你要保证数据的一致性,选择commitSync()因为这将确保,做任何进一步的行动之前,你就会知道是否偏移提交成功还是失败。但由于它是同步和阻塞,你将花费在等待提交到完成,这将导致高等待时间更长的时间。
  • 如果你确定某些数据不一致的,并希望有低延迟,选择commitAsync()因为它不会等待完成。相反,它只会发出提交请求并处理来自卡夫卡的响应(成功或失败)后,同时,您的代码将继续执行。

这是所有一般来说,实际行为将取决于你的实际代码,并在您所呼叫的方法。


2
投票

无论commitSync和commitAsync使用卡夫卡偏移管理功能和既有缺点。如果消息处理成功并提交偏移失败(不是原子),并在同一时间分区重新平衡情况,你处理的消息被一些其他消费再次处理(重复处理)。如果你有好有重复的消息处理,那么你可以去commitAsync(因为它没有阻止,并提供低延迟,并提供了一个高阶提交。所以你应该没问题)。否则,去定制偏移管理,同时处理和更新的偏移量发生原子的护理(使用外部存储偏移)

© www.soinside.com 2019 - 2024. All rights reserved.