消息处理时收到Kafka错误[关闭]

问题描述 投票:-1回答:2

偏移{<TOPICNAME>-0=OffsetAndMetadata{offset=4, metadata=''}}的异步自动提交失败: 由于该组已经重新平衡并将分区分配给另一个成员,因此无法完成提交。这意味着后续调用poll()之间的时间比配置的max.poll.interval.ms长,这通常意味着轮询循环花费了太多时间进行消息处理。 您可以通过增加会话超时或通过使用poll()减少max.poll.records中返回的批次的最大大小来解决此问题。

spring-boot apache-kafka
2个回答
1
投票

对消费者设定的session.timeout.ms应该低于卡夫卡经纪人设定的group.max.session.timeout.ms

请参考:

https://github.com/dpkp/kafka-python/issues/746


0
投票

Session.time.out控制消费者在不向组协调器发送心跳的情况下可以运行多长时间。

设置更高的值意味着需要更长的时间来检测真正的故障。所以我们不应该增加session.time.out。

我们现在可以减少max.poll.records或增加max.poll.interval.ms来解决这个问题。

如果我们没有遵循GroupCoordinator的超时条件,那么我们将得到无效的会话超时。

kafka.coordinator.group.GroupCoordinator

 if (sessionTimeoutMs < groupConfig.groupMinSessionTimeoutMs ||
      sessionTimeoutMs > groupConfig.groupMaxSessionTimeoutMs) {
      responseCallback(joinError(memberId, Errors.INVALID_SESSION_TIMEOUT))
    }
© www.soinside.com 2019 - 2024. All rights reserved.