未调用自定义 DeadLetterPublishingRecoverer

问题描述 投票:0回答:1

我们正在从 Spring Boot 2 / spring-kafka 2.8.4 升级到 Spring Boot 3 / spring-kafka 3.1.2 并且必须从

SeekToCurrentErrorHandler
过渡到
CommonErrorHandler
。我们最初的尝试是将创建
SeekToCurrentErrorHandler
的方法替换为创建
DefaultErrorHandler
的方法,该方法调用采用
ConsumerRecordRecoverer
BackOff
的构造函数。
ConsumerRecordRecoverer
的 BiFunction 永远不会被调用。

这次升级我们做错了什么?

我们尝试使用此设置创建一个 DefaultErrorHandler bean,但结果相同。

这是我们在带有

@Configuration
注释的
@EnableKafka
类中的设置:

@Bean
    public ConcurrentKafkaListenerContainerFactory<Object, Object> kafkaListenerContainerFactory(
            ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
            ConsumerFactory<Object, Object> consumerFactory,
            KafkaTemplate<Object, Object> kafkaTemplate) {
        final var factory = new ConcurrentKafkaListenerContainerFactory<>();

        configurer.configure(factory, consumerFactory);

        factory.setCommonErrorHandler(
                feedsErrorHandler(kafkaTemplate, retryProperties.getMainTopic()));
        return factory;
    }

    private DefaultErrorHandler feedsErrorHandler(
            KafkaTemplate<? extends Object, ? extends Object> kafkaTemplate, RetrySettings retrySettings) {
        final var backOffWithMaxRetries = createBackOffWithMaxRetries(retrySettings);
        return createCommonErrorHandler(kafkaTemplate, backOffWithMaxRetries);
    }

    private ExponentialBackOffWithMaxRetries createBackOffWithMaxRetries(
            RetrySettings retrySettings) {
        final var backOffWithMaxRetries =
                new ExponentialBackOffWithMaxRetries(retrySettings.getMaxRetries());
        backOffWithMaxRetries.setInitialInterval(retrySettings.getInitialInterval());
        backOffWithMaxRetries.setMultiplier(retrySettings.getMultiplier());
        backOffWithMaxRetries.setMaxInterval(retrySettings.getMaxInterval());
        return backOffWithMaxRetries;
    }

    private DefaultErrorHandler createCommonErrorHandler(
            KafkaTemplate<? extends Object, ? extends Object> kafkaTemplate,
            ExponentialBackOffWithMaxRetries backOffWithMaxRetries) {
        final var defaultErrorHandler =
                new DefaultErrorHandler(
                        deadLetterPublishingRecoverer(kafkaTemplate), backOffWithMaxRetries);

        // Setup Retryable Exceptions here. And add them to the classifications map.
        // All other exceptions default to non-retryable.
        final var classified = new HashMap<Class<? extends Throwable>, Boolean>();
        retryProperties.getRetryableExceptions().forEach(aClass -> classified.put(aClass, true));
        defaultErrorHandler.setClassifications(classified, false);

        defaultErrorHandler.setCommitRecovered(true);

        return defaultErrorHandler;
    }

    private DeadLetterPublishingRecoverer deadLetterPublishingRecoverer(
            KafkaTemplate<? extends Object, ? extends Object> kafkaTemplate) {

      return new DeadLetterPublishingRecoverer(
          kafkaTemplate,
          (consumerRecord, e) -> {
            log.error("Error while processing [{}]", consumerRecord, e);
            final var topic = determineTopic(consumerRecord, e.getCause());
            log.error("Moving [{}] to [{}]", consumerRecord, topic);
            return new TopicPartition(topic, -1);
          });
    }

这次升级我们做错了什么?

我们尝试使用此设置创建一个 DefaultErrorHandler bean,但结果相同。

spring-boot spring-kafka
1个回答
0
投票

参见

FailedRecordTracker
。里面的逻辑是这样的:

    long nextBackOff = failedRecord.getBackOffExecution().nextBackOff();
    if (nextBackOff != BackOffExecution.STOP) {
        this.backOffHandler.onNextBackOff(container, exception, nextBackOff);
        return false;
    }
    else {
        attemptRecovery(record, exception, topicPartition, consumer);
        map.remove(topicPartition);
        if (map.isEmpty()) {
            this.failures.remove(currentThread);
        }
        return true;
    }

确保您确实用尽了重试次数。

如果您能提供一些简单的项目来让我们在调试模式下重现和使用,那就太好了。

© www.soinside.com 2019 - 2024. All rights reserved.