如何使用Kafka Stream手动提交?

问题描述 投票:0回答:1

有没有办法使用 Kafka Stream 手动提交?

通常使用

KafkaConsumer
,我会执行如下操作:

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(100);
    for (ConsumerRecord<String, String> record : records){
       // process records
    }
   consumer.commitAsync();
}

我手动调用提交。我没有看到类似的 API

KStream

apache-kafka apache-kafka-streams
1个回答
40
投票

提交由 Streams 在内部完全自动处理,因此通常没有理由手动提交。请注意,Streams 处理此问题的方式与消费者自动提交不同 - 事实上,内部使用的消费者会禁用自动提交,而 Streams 会“手动”管理提交。原因是,提交只能在处理过程中的某些点发生,以确保不会丢失数据(在更新状态和刷新结果方面存在许多内部依赖性)。

对于更频繁的提交,您可以通过

StreamsConfig
参数
commit.interval.ms
减少提交间隔。

尽管如此,可以通过低级处理器 API 间接进行手动提交。您可以使用通过

context
方法提供的
init()
对象来调用
context#commit()
。请注意,这只是“请求 Streams”尽快提交——它并不是直接发出提交。

© www.soinside.com 2019 - 2024. All rights reserved.