Spring Kafka RecoveryCallback 迁移

问题描述 投票:0回答:1

我正在使用 spring kafka 2.X,现在想迁移到 3.0

目前:

我有一个侦听器容器工厂设置,仅用于重试功能,我在其中设置恢复,如下所示:

@Bean
public ConcurrentKafkaListenerContainerFactory<?, ?> retryConcurrentKafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());

    // Configure the RetryTemplate
    RetryTemplate retryTemplate = new RetryTemplate();
    // Configure your retry policy, backoff policy, and any other necessary options
    // retryTemplate.setRetryPolicy(...);
    // retryTemplate.setBackOffPolicy(...);

  
    RetryOperationsInterceptor interceptor = new RetryOperationsInterceptor();
    interceptor.setRetryOperations(retryTemplate);
    factory.setRetryTemplate(retryTemplate);

    
    RecoveryCallback<Object> recoveryCallback = context -> {
        

        ConsumerRecord<?, ?> record = (ConsumerRecord<?, ?>)        context.getAttribute(RetryingMessageListenerAdapter.CONTEXT_RECORD);
        System.out.println("Recovery callback invoked for record: " + record.value());


     


    Acknowledgment acknowledgment = (Acknowledgment)              context.getAttribute(RetryingMessageListenerAdapter.CONTEXT_ACKNOWLEDGMENT);
        
        System.out.println("Recovery callback invoked for record: " + record.value());
        
        
        if (acknowledgment != null) {
            // Manually acknowledge the record
            acknowledgment.acknowledge();
        }


        return null; 
    };
    factory.setRecoveryCallback(recoveryCallback);

    // Other configuration options for the factory
    return factory;
}

现在使用kafka spring 3.0版本我怎样才能实现同样的事情?

我参考了官方文档,但没有帮助。

spring-boot apache-kafka kafka-consumer-api spring-kafka
1个回答
1
投票

查看

DefaultErrorHandler
及其
BackOff
ConsumerRecordRecoverer
选项:https://docs.spring.io/spring-kafka/reference/kafka/annotation-error-handling.html#default-eh.

所以,你必须使用

factory.setRetryTemplate(retryTemplate);
 而不是 
factory.setRecoveryCallback(recoveryCallback);
factory.setCommonErrorHandler(new DefaultErrorHandler(...));

© www.soinside.com 2019 - 2024. All rights reserved.