Kafka增加处理超时消息

问题描述 投票:0回答:1

在kafka消费者中,如果处理消息需要5分钟以上,则消息是新处理的,我让配置消费者增加了“ max.poll.interval.ms”和session.timeout.ms]

max.poll.interval.ms = 7200000(2Hrs)session.timeout.ms = 7200000(2Hrs)request.timeout.ms = 7206000(〜2Hrs)

此工作首先开始,然后重新平衡我对消费者有此错误

2019-10-18 15:29:51.739 INFO  o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=consumer-5, groupId=processgroup] (Re-)joining

group

apache-kafka kafka-consumer-api
1个回答
0
投票

您使用的是正确的方法。您还需要在使用者和用户中设置session.timeout.msKafka代理中的group.max.session.timeout.ms,以避免重新平衡。

订阅了一组主题后,消费者将自动调用poll(long)时加入该组。民意调查API旨在确保消费者的生活。只要您继续致电民意调查,消费者将留在组中并继续接收来自分配给它的分区。在封面下,消费者定期向服务器发送心跳。如果消费者崩溃或在session.timeout.ms的时间内无法发送心跳,然后消费者将被视为死亡,其分区将是重新分配。

max.poll.interval.ms:使用使用者组管理时,调用poll()之间的最大延迟。如果在此超时到期前未调用poll(),则认为使用方失败,该组将重新平衡以将分区重新分配给另一个成员。

注意:请注意,如果最高民意测验增加很多,将会延迟一组重新平衡,因为消费者重新平衡仅在民意调查获得时才加入叫。

request.timeout.ms:该配置控制客户端等待请求响应的最长时间。如果超时之前仍未收到响应,则客户端将在必要时重新发送请求,如果重试已用尽,则会使请求失败。

Note:这间接地不影响重新平衡,但是需要进行设置,因为它必须始终大于max.poll.interval.ms,否则它将抛出配置错误。

session.timeout.ms:使用Kafka的群组管理工具时,超时用于检测使用者故障。消费者定期发送心跳,以向经纪人表明其活跃程度。如果经纪人在此会话超时之前未收到任何心跳信号,则经纪人将从该组中删除此使用者并启动重新平衡。

Note:但是单独设置session.timeout.ms不会很棘手您需要检查Kafka经纪人设置的工作group.min.session.timeout.ms也需要增加。

group.max.session.timeout.ms(在kafka经纪人中):注册使用者的最大允许会话超时。其默认值为30000毫秒。

© www.soinside.com 2019 - 2024. All rights reserved.