Spring Kafka:如何启动已停止的 MessageListenerContainer

问题描述 投票:0回答:1

我使用Spring Kafka。如果处理过程中发生错误,我想停止接收消息,然后在一段时间后恢复接收。 我看到 spring kafka 有一个

CommonContainerStoppingErrorHandler
处理程序。 但据我了解,它只是停止阅读。是否可以使用标准方式开始回读?

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String,String> kafkaListenerContainerFactory(){
        ConcurrentKafkaListenerContainerFactory<String,String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.RECORD);
        factory.setConcurrency(2);
        factory.setCommonErrorHandler(new CommonContainerStoppingErrorHandler());
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }

我现在已经编写了一个处理程序,但对我来说这似乎不是一个好的解决方案。

    @Bean
    CommonErrorHandler eh() {
        return new DefaultErrorHandler((rec, ex) -> {
            System.out.println("Close");
        }, new FixedBackOff(1000L, 1)) {

            @Override
            public void handleRemaining(Exception thrownException, List<ConsumerRecord<?, ?>> records,Consumer<?, ?> consumer, MessageListenerContainer container) {
                if (container!=null && container.isRunning()){
                    container.stop();
                    taskExecuror.schedule(()->container.start(), delay);
                }
            }
        };
    }

还有一个问题,据我了解,当指定并发2时,会在主容器内部创建子容器,这些子容器对消费者负责。父容器被传递给方法

public void handleRemaining(Exception thrownException, List<ConsumerRecord<?, ?>> records,Consumer<?, ?> consumer, MessageListenerContainer container)
,并为此容器调用
stop
停止接收来自所有分区的消息。如果当前正在其中一个容器中处理一条消息,会发生什么情况,
stop
方法会等待处理结束吗?处理后偏移量会保存吗?

java spring spring-kafka
1个回答
0
投票

在第一个场景中,如果您想在一段时间后继续处理,为什么不能使用具有正常重试功能的错误处理程序?您可以尝试使用指数退避或类似的方法重试。据我所知,这达到了相同的结果。如果您有严格的用例,需要在延迟一段时间后继续容器,那么以编程方式启动容器(如上所示)是一种有效的策略。

对于第二个问题,停止容器时,会等待所有获取的记录处理完毕才停止容器。如果您在容器上启用

stopImmediate
属性,则容器会在处理当前记录后停止。

© www.soinside.com 2019 - 2024. All rights reserved.