Spring Kafka:处理长时间作业时如何防止分区撤销

问题描述 投票:0回答:1

我想问一下,有没有办法防止consumer在使用@KafkaListener处理长时间操作时被撤销?

我有一个应用程序,其中的作业需要花费大量时间来处理(30-60 分钟),目前 max.poll.interval.ms 设置为 90 分钟。当应用程序的新实例出现时,会进行重新平衡,并且如果存在实际处理作业,则来自其他分区的消息将被挂起(因为分区/组被撤销)。即使主题正在重新平衡,是否有任何方法仍然可以由消费者处理新消息?

apache-kafka kafka-consumer-api spring-kafka
1个回答
0
投票

Kafka 并不是真正为这种场景设计的。

但是,您可以在收到记录时将

max.poll.records
设置为
1
,以及
pause()
容器。

将处理交给另一个线程并退出监听器;容器将在处理时继续轮询代理,但不返回任何记录。处理完成后,确认记录(使用

AckMode.MANUAL
),提交其偏移量,然后
resume()
容器。

如果处理时发生重新平衡,容器将重新暂停消费者。

如果您想在处理之前提交偏移量(冒着记录丢失的风险),则不需要手动确认。

© www.soinside.com 2019 - 2024. All rights reserved.