Spring Kafka 优雅关闭

问题描述 投票:0回答:1

我正在使用 spring 框架和 spring Kafka 为一些 kafka 主题编写消费者。由于某种原因,我必须使用 spring-kafka 1.3。我已经完成手动配置来读取所有主题,创建侦听器,将其包装在容器中,然后启动所有主题。我正在使用带有手动确认功能的批量消息侦听器。

我希望应用程序在关闭时正常停止。我正在考虑让消费者暂停,处理获取的所有剩余有效负载,然后停止容器。当所有容器都停止时,我会停止并退出应用程序。但是,旧版本没有容器的暂停方法,也没有任何访问 Consumer 对象的方法。有什么办法可以在不更新到新版本的情况下实现这一目标吗?

stop 方法等待容器在定义的超时值内处理有效负载,然后停止它,这可能会导致有效负载被丢弃或处理两次。

java spring apache-kafka spring-kafka
1个回答
0
投票

由于您自己构建容器,因此您可以访问消费者。如果您创建一个 ConsumerService,其中包含所有使用的消费者的内部列表,并向其添加一个关闭钩子,会怎么样?

@Service
public class ConsumerService {

    private final List<KafkaConsumer> consumers = new ArrayList<KafkaConsumer>();

    public KafkaConsumer<String, String> requestConsumer(){
        var consumer = createConsumer();
        consumers.add(consumer);
        return consumer;   
    }

    @PreDestroy
    public void destroy() {
        //here do the clean, like for example not ACK the messages, so another consumer can pick them up?
    }

}
© www.soinside.com 2019 - 2024. All rights reserved.