我不清楚为什么我们需要
session.timeout.ms
和 max.poll.interval.ms
以及何时使用其中之一或两者?似乎这两个设置都表明协调器在假设消费者死亡之前等待从消费者那里获取心跳的时间上限。
基于 KIP-62,它在 0.10.1.0+ 版本上的表现如何?
在 KIP-62 之前,只有
session.timeout.ms
(即 Kafka 0.10.0
及更早版本)。 max.poll.interval.ms
是通过KIP-62引入的(Kafka 0.10.1
的一部分)。
KIP-62,通过后台心跳线程将心跳与对
poll()
的调用解耦,允许比心跳间隔更长的处理时间(即两个连续 poll()
之间的时间)。
假设处理一条消息需要 1 分钟。如果心跳和轮询是耦合的(即在 KIP-62 之前),您需要将
session.timeout.ms
设置为大于 1 分钟,以防止消费者超时。但是,如果消费者死亡,也需要超过 1 分钟才能检测到失败的消费者。
KIP-62 将轮询和心跳解耦,允许在两个连续轮询之间发送心跳。现在您有两个线程正在运行,即心跳线程和处理线程,因此,KIP-62 为每个线程引入了超时。
session.timeout.ms
用于心跳线程,而 max.poll.interval.ms
用于处理线程。
假设您设置了
session.timeout.ms=30000
,因此消费者心跳线程必须在该时间到期之前向代理发送心跳。另一方面,如果处理单个消息需要 1 分钟,您可以将 max.poll.interval.ms
设置为大于一分钟,以便为处理线程提供更多时间来处理消息。
如果处理线程终止,则需要
max.poll.interval.ms
才能检测到这一点。但是,如果整个消费者死亡(并且死亡的处理线程很可能使包括心跳线程在内的整个消费者崩溃),只需要 session.timeout.ms
即可检测到它。
这个想法是,即使处理本身需要很长时间,也可以快速检测失败的消费者。
实施细节
新的超时
max.poll.interval.ms
主要是客户端概念:如果poll()
在max.poll.interval.ms
内没有被调用,心跳线程将检测到这种情况并向broker发送离开组请求。 -- max.poll.interval.ms
仍然与消费者组重新平衡相关:如果触发重新平衡,消费者有 max.poll.interval.ms
时间通过调用 poll()
客户端来重新加入组,从而触发加入组请求。