Apache Kafka文档说明:
内部Kafka Streams使用者max.poll.interval.ms默认值已从300000更改为Integer.MAX_VALUE
由于此值用于检测一批记录的处理时间何时超过给定阈值,是否存在这种“无限”值的原因?
它是否使应用程序无响应?或者,当处理时间过长时,Kafka Streams有不同的方式离开消费者群体?
它是否使应用程序无响应?或者,当处理时间过长时,Kafka Streams有不同的方式离开消费者群体?
Kafka Streams在此上下文中利用了Kafka消费者客户端的心跳功能,因此从对poll()
的调用中解除了心跳(“这个应用程序实例是否还活着吗?”)。两个主要参数是session.timeout.ms
(用于心跳线程)和max.poll.interval.ms
(用于处理线程),它们的差异在https://stackoverflow.com/a/39759329/1743580中有更详细的描述。
引入了心跳,以便允许应用程序实例花费大量时间处理记录而不被视为“没有进展”并因此“死亡”。例如,你的应用程序可以对一分钟的单个记录进行大量的处理,同时仍然心跳到Kafka“嘿,我还活着,我正在取得进展。但我还没完成处理。 敬请关注。”
当然,您可以将max.poll.interval.ms
从其默认值(Integer.MAX_VALUE
)更改为较低的设置,例如,如果您确实希望您的应用程序实例在轮询记录之间花费超过X秒时被视为“死”,那么处理最新一轮记录需要超过X秒的时间。这取决于您的具体用例,这种配置是否有意义 - 在大多数情况下,默认设置是一个安全的赌注。
session.timeout.ms
:使用Kafka的组管理工具时用于检测消费者故障的超时。消费者定期发送心跳以指示其对经纪人的活跃性。如果在此会话超时到期之前代理没有收到心跳,则代理将从该组中删除此使用者并启动重新平衡。请注意,该值必须在group.min.session.timeout.ms和group.max.session.timeout.ms的代理配置中配置的允许范围内。
max.poll.interval.ms
:使用消费者群组管理时poll()调用之间的最大延迟。这为消费者在获取更多记录之前可以闲置的时间量设置了上限。如果在此超时到期之前未调用poll(),则认为使用者失败,并且该组将重新平衡以便将分区重新分配给另一个成员。