我想问一下,有没有办法防止consumer在使用@KafkaListener处理长时间操作时被撤销?
我有一个应用程序,其中的作业需要花费大量时间来处理(30-60 分钟),目前 max.poll.interval.ms 设置为 90 分钟。当应用程序的新实例出现时,会进行重新平衡,并且如果存在实际处理作业,则来自其他分区的消息将被挂起(因为分区/组被撤销)。即使主题正在重新平衡,是否有任何方法仍然可以由消费者处理新消息?
Kafka 并不是真正为这种场景设计的。
但是,您可以在收到记录时将
max.poll.records
设置为 1
,以及 pause()
容器。
将处理交给另一个线程并退出监听器;容器将在处理时继续轮询代理,但不返回任何记录。处理完成后,确认记录(使用
AckMode.MANUAL
),提交其偏移量,然后 resume()
容器。
如果处理时发生重新平衡,容器将重新暂停消费者。
如果您想在处理之前提交偏移量(冒着记录丢失的风险),则不需要手动确认。