kafka期间的轮询呼叫是否重新平衡了繁忙的等待?

问题描述 投票:0回答:1

我正在初始化Kafka使用者并在接收和处理消息后手动调用kafka commit时将属性enable.auto.commit设置为false来使用手动kafka commit。

但是由于我的用户处理消息很花时间,所以我得到了Exception with message "error": "Broker: Group rebalance in progress"

原因是重新平衡超时后的提交被此错误拒绝。现在,为此的恢复操作是退出并重新实例化该过程,该过程将再次触发重新平衡和分区分配。另一种方法是捕获此异常,然后照常继续执行,只有在poll()调用被阻塞直到重新平衡完成之前,它才能正常工作,否则它将从批处理中提取下一个数据包,并可能成功处理并提交,从而导致重新平衡时丢失提交失败的消息。

因此,需要知道处理这种情况的正确方法是什么,我应该重新实例化该过程还是应该捕获并忽略该异常?

apache-kafka kafka-consumer-api
1个回答
0
投票

[最好的方法是忽略它是否偶尔发生,并且如果它经常发生,则降低max.poll.records或增加max.poll.interval.ms以确保它仅偶尔发生。另外,请确保您的代码可以处理重复的记录(如果您不能执行重复的记录,那么答案会有所不同)。

您所看到的错误是,正如您可能意识到的那样,因为当消费者做出承诺时,该小组已经决定它可能已经消失了,因此,作为重新平衡的一部分,其他消费者选择了它的分区-新消费者将从最后提交的偏移量开始,因此重复。

鉴于最初的消费者还活着,而且毫无疑问,它将再次投票,从而引发另一次重新平衡。此轮询不会阻止等待重新平衡发生-每个轮询都允许有关组当前状态的一些通信(在轮询线程内),并且在多次轮询后,将同意并接受新的分区分配,然后重新平衡被认为是竞争,并且轮询将告诉消费者其分区分配并返回一组记录。

© www.soinside.com 2019 - 2024. All rights reserved.