DefaultErrorHandler 不可配置如果 @RetryableTopic 用于重试和 DLT 处理程序

问题描述 投票:0回答:1

春季启动版本:2.7.6 春季卡夫卡版本:2.8.11

问题:

我试图处理代码中的反序列化问题。为了在代码中处理此类问题,我通过扩展创建了自己的类

DefaultErrorHandler

并覆盖

public void handleOtherException(Exception thrownException, Consumer<?, ?> consumer, MessageListenerContainer container, boolean batchListener) {} 
示例代码如下

public class CustomDefaultErrorHandler extends DefaultErrorHandler {

    private static Logger log = LoggerFactory.getLogger(CustomDefaultErrorHandler.class);
    @Override
    public void handleOtherException(Exception thrownException, Consumer<?, ?> consumer, MessageListenerContainer container, boolean batchListener) {
        manageException(thrownException, consumer);
    }

    private void manageException(Exception ex, Consumer<?, ?> consumer) {
        log.error("Error polling message: " + ex.getMessage());
        if (ex instanceof RecordDeserializationException) {
            RecordDeserializationException rde = (RecordDeserializationException) ex;
            consumer.seek(rde.topicPartition(), rde.offset() + 1L);
            consumer.commitSync();
        } else {
            log.error("Exception not handled");
        }
    }
}

如果我将 @RetryableTopic 与 @KafkaListener 一起使用

@RetryableTopic(listenerContainerFactory = "kafkaListenerContainerFactory", backoff = @Backoff(delay = 8000, multiplier = 2.0),
        dltStrategy = DltStrategy.FAIL_ON_ERROR
        , traversingCauses = "true", autoCreateTopics = "true", numPartitions = "3", replicationFactor = "3",
        fixedDelayTopicStrategy = FixedDelayStrategy.MULTIPLE_TOPICS, include = {RetriableException.class, RecoverableDataAccessException.class,
        SQLTransientException.class, CallNotPermittedException.class}
)
@KafkaListener(topics = "${topic.name}", groupId = "order", containerFactory = "kafkaListenerContainerFactory", id = "OTR")
public void consumeOTRMessages(ConsumerRecord<String, PayloadsVO> payload, @Header(KafkaHeaders.RECEIVED_TOPIC) String topicName) throws JsonProcessingException {
    logger.info("Payload :{}", payload.value());
    payloadsService.savePayload(payload.value(), pegasusTopicName);

}

我在调试代码时看到,@RetryableTopic在

中有自己的DefaultErrorHandler配置
ListenerContainerFactoryConfigurer

它会停止我的自定义处理程序,并且反序列化过程不会因问题而停止。

你能建议任何方法吗,因为我想在我的代码中使用注释进行重试过程

我尝试配置我自己的实现

DefaultErrorHandler

通过扩展它并在

中进行配置
ConcurrentKafkaListenerContainerFactory
spring spring-kafka retry-logic spring-boot-3
1个回答
1
投票

这很复杂,但是您应该能够覆盖

RetryTopicComponentFactory
bean 并覆盖
listenerContainerFactoryConfigurer()
以返回自定义错误处理程序。

也就是说,反序列化异常无论如何都会直接进入 DLT。

顺便说一句,在这里调用

commitSync()
是没有价值的,因为
poll()
没有返回任何记录。

© www.soinside.com 2019 - 2024. All rights reserved.