我们有一些长期运行的作业,也是用Spring云流和Kafka binder实现的。我们面临的问题是,由于默认的 max.poll.interval.ms
和 max.poll.records
不适合我们的用例,我们需要给 max.poll.interval.ms
(几小时),而对于 max.poll.records
(如1)要与最长运行的工作保持一致,可以得到消费者的消费。这解决了消费者进入再平衡循环的问题。但是,它给消费者带来了一些操作上的挑战。有时会发生消费者在重启时卡住,不消费任何消息的情况,直到在 max.poll.interval.ms
通过。
这是因为Spring Cloud流投票的实现方式吗?如果我使用同步消费者和管理的 poll()
相应的?
消费者在Kafka日志中记录了心跳的丢失,当消费者出现卡顿的时候,我可以看到消息。
GroupCoordinator 11]: Member consumer-3-f46e14b4-5998-4083-b7ec-bed4e3f374eb in group foo has failed, removing it from the group
Spring云流(消息驱动)并不适合这个应用。最好是自己管理消费者;关闭它后的 poll()
处理作业;创建一个新的消费者并提交偏移量,以及 poll()
再次。