有没有办法使用 Kafka Stream 手动提交?
通常使用
KafkaConsumer
,我会执行如下操作:
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records){
// process records
}
consumer.commitAsync();
}
我手动调用提交。我没有看到类似的 API
KStream
。
提交由 Streams 在内部完全自动处理,因此通常没有理由手动提交。请注意,Streams 处理此问题的方式与消费者自动提交不同 - 事实上,内部使用的消费者会禁用自动提交,而 Streams 会“手动”管理提交。原因是,提交只能在处理过程中的某些点发生,以确保不会丢失数据(在更新状态和刷新结果方面存在许多内部依赖性)。
对于更频繁的提交,您可以通过
StreamsConfig
参数commit.interval.ms
减少提交间隔。
尽管如此,可以通过低级处理器 API 间接进行手动提交。您可以使用通过
context
方法提供的 init()
对象来调用 context#commit()
。请注意,这只是“请求 Streams”尽快提交——它并不是直接发出提交。