一旦遇到异常我想停止处理spring-kafka中的后续消息

问题描述 投票:0回答:1

当我遇到异常时,kafka 侦听器不得处理新消息。它应该等到异常情况得到解决(可能需要手动干预),然后开始消费消息。

配置

@Bean("consumerContainerFactory")
    public ConcurrentKafkaListenerContainerFactory<String, ? extends SpecificRecordBase>
    kafkaListenerContainerFactory() {

        String concurrency = consumer == null ?  "1" : consumer.getConcurrency();
        ConcurrentKafkaListenerContainerFactory<String, ? extends SpecificRecordBase> factory =
                new ConcurrentKafkaListenerContainerFactory<>();

        factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerConfigs()));
        factory.setConcurrency(Integer.parseInt(concurrency));
        factory.setCommonErrorHandler(getDefaultErrorHandler());
        factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);
        log.info("Kafka Consumer Created Successfully");
        return factory;
    }

    @Bean
    public DefaultErrorHandler getDefaultErrorHandler() {
        DefaultErrorHandler defaultErrorHandler = new DefaultErrorHandler();
        defaultErrorHandler.setAckAfterHandle(false);
        return defaultErrorHandler;
    }

    public Map<String, Object> consumerConfigs() {

        ConsumerConfiguration config = consumer == null ? new ConsumerConfiguration() : consumer;
        Map<String, Object> consumerProperties = new HashMap<>(kafkaCommonConfigs());

        consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        consumerProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        consumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class);

        if (StringUtils.isNotBlank(config.getMaxPollInterval())) {
            consumerProperties.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG,
                    Integer.parseInt(config.getMaxPollInterval()));
        }

        if (StringUtils.isNotBlank(config.getSessionTimeout())) {
            consumerProperties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG,
                    Integer.parseInt(config.getSessionTimeout()));
        }
        consumerProperties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        consumerProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, EARLIEST);
        consumerProperties.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, READ_COMMITTED);
        consumerProperties.put(SPECIFIC_AVRO_READER_CONFIG, true);

        return consumerProperties;
    }

听众

@KafkaListener(topics = "#{topicName}",
            groupId = "#{consumerGroupId}",
            containerFactory = "consumerContainerFactory")
    public void listenProgramNumberMessages(ConsumerRecord<String, Event> record,
                                            Acknowledgment acknowledgment) {
        try {
            var event = record.value();
            log.info("Received message: {}", event);
            if(isProcessingAllowed(event)){
                log.info("Consumed message: {}" , marketChannelCustomerChoiceEvent);
                if(event.getCustomerChoice().getAbbreviatedDescription()
                        .contains("BLENDER")){
                    throw new RuntimeException();
                }
                



listenerService.doSomething();
                acknowledgment.acknowledge();
            }
        } catch (Exception e) {
            log.error("Exception occurred :", e);
            acknowledgment.nack(Duration.ofMillis(1000));
        }
    }

我正在通过显式抛出 RuntimeException 来测试它。因此,如果我更改/发布与 BLENDER 相关的内容,我会在日志中收到异常,并且滞后计数会增加,但是当我更改/发布与 BLENDER 无关的内容时,消息将被消耗。我希望消息被卡住。有办法做到吗? Spring-Kafka版本是2.8.1 Spring boot 2.7

spring-boot spring-kafka
1个回答
0
投票

您可以使用

CommonContainerStoppingErrorHandler
(它完全停止容器,可以通过
ListenerContainerEndpointRegistry
bean 重新启动)或使用具有无限重试 (
FixedBackOff
) 的
Long.MAX_VALUE
以及
DefaultErrorHandler 中合适的重试间隔
;默认后退是 9 次重试,没有间隔。

https://docs.spring.io/spring-kafka/docs/current/reference/html/#container-stopping-error-handlers

https://docs.spring.io/spring-kafka/docs/current/reference/html/#default-eh

© www.soinside.com 2019 - 2024. All rights reserved.