其缺点是,虽然commitSync()将重试所述提交,直到它成功或遇到非retriable失败,commitAsync()将不会重试。
这句话是我也不清楚。我想,消费者发送提交请求斡旋,并在情况下,如果经纪人没有一些超时则意味着提交失败内做出回应。我错了吗?
你能澄清commitSync
的细节差异,commitAsync
?
另外,请提供时提交类型,我应该更喜欢用例。
由于它的API文档中说:
这是一个同步提交和将阻塞,直到要么提交成功或遇到不可恢复的错误(在这种情况下,它被抛出给调用者)。
这就是说,通过commitSync
是一个阻塞方法。调用它会阻止你的线程,直到它成功或失败。
例如,
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value());
consumer.commitSync();
}
}
对于在for循环,只有经过consumer.commitSync()
成功返回或者抛出的异常中断,你的代码将移动到下一个迭代每个迭代。
这是一个异步调用,并不会阻止。遇到的任何错误都要么传递给回调(如果有的话)或丢弃。
这就是说,通过commitAsync
是非阻塞方法。调用它不会阻止你的线程。相反,它会继续处理下面的说明,不管它是否会成功还是失败,最终。
例如,类似于前面的例子,但在这里我们使用commitAsync
:
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value());
consumer.commitAsync(callback);
}
}
对于在for循环,不管会发生什么,最终consumer.commitAsync()
每次迭代中,您的代码将移动到下一个迭代。而且,上提交的结果将是由您定义的回调函数来处理。
权衡:延迟与数据一致性
commitSync()
因为这将确保,做任何进一步的行动之前,你就会知道是否偏移提交成功还是失败。但由于它是同步和阻塞,你将花费在等待提交到完成,这将导致高等待时间更长的时间。commitAsync()
因为它不会等待完成。相反,它只会发出提交请求并处理来自卡夫卡的响应(成功或失败)后,同时,您的代码将继续执行。这是所有一般来说,实际行为将取决于你的实际代码,并在您所呼叫的方法。
无论commitSync和commitAsync使用卡夫卡偏移管理功能和既有缺点。如果消息处理成功并提交偏移失败(不是原子),并在同一时间分区重新平衡情况,你处理的消息被一些其他消费再次处理(重复处理)。如果你有好有重复的消息处理,那么你可以去commitAsync(因为它没有阻止,并提供低延迟,并提供了一个高阶提交。所以你应该没问题)。否则,去定制偏移管理,同时处理和更新的偏移量发生原子的护理(使用外部存储偏移)