我正在使用 spring 框架和 spring Kafka 为一些 kafka 主题编写消费者。由于某种原因,我必须使用 spring-kafka 1.3。我已经完成手动配置来读取所有主题,创建侦听器,将其包装在容器中,然后启动所有主题。我正在使用带有手动确认功能的批量消息侦听器。
我希望应用程序在关闭时正常停止。我正在考虑让消费者暂停,处理获取的所有剩余有效负载,然后停止容器。当所有容器都停止时,我会停止并退出应用程序。但是,旧版本没有容器的暂停方法,也没有任何访问 Consumer
stop 方法等待容器在定义的超时值内处理有效负载,然后停止它,这可能会导致有效负载被丢弃或处理两次。
由于您自己构建容器,因此您可以访问消费者。如果您创建一个 ConsumerService,其中包含所有使用的消费者的内部列表,并向其添加一个关闭钩子,会怎么样?
@Service
public class ConsumerService {
private final List<KafkaConsumer> consumers = new ArrayList<KafkaConsumer>();
public KafkaConsumer<String, String> requestConsumer(){
var consumer = createConsumer();
consumers.add(consumer);
return consumer;
}
@PreDestroy
public void destroy() {
//here do the clean, like for example not ACK the messages, so another consumer can pick them up?
}
}