我有一个 Kafka 流应用程序,我创建了自定义生产/消费/未捕获的异常处理程序,尽管出现异常,但所有处理程序都返回
CONTINUE
以继续处理。在这种情况下,Kafka 流是否提交偏移量?我查看了代码,似乎没有,但这对我来说没有多大意义。
在内部,Streams API 利用 Kafka 的消费者客户端读取输入主题,并使用 commit.interval.ms 中设置的值定期提交已处理消息的偏移量。
对于异常处理程序,每个异常处理程序可以根据记录和抛出的异常返回 FAIL 或 CONTINUE。返回 FAIL 将表明 Streams 应关闭,而 CONTINUE 将表明 Streams 应忽略该问题并继续处理。
无论哪种情况,偏移量上的提交都将由 Streams API 在内部执行。
只是为了纠正您在异常处理期间为偏移提交提供的信息,
我已经动手完成了提交偏移量部分。