我正在使用 spring kafka 2.X,现在想迁移到 3.0
目前:
我有一个侦听器容器工厂设置,仅用于重试功能,我在其中设置恢复,如下所示:
@Bean
public ConcurrentKafkaListenerContainerFactory<?, ?> retryConcurrentKafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
// Configure the RetryTemplate
RetryTemplate retryTemplate = new RetryTemplate();
// Configure your retry policy, backoff policy, and any other necessary options
// retryTemplate.setRetryPolicy(...);
// retryTemplate.setBackOffPolicy(...);
RetryOperationsInterceptor interceptor = new RetryOperationsInterceptor();
interceptor.setRetryOperations(retryTemplate);
factory.setRetryTemplate(retryTemplate);
RecoveryCallback<Object> recoveryCallback = context -> {
ConsumerRecord<?, ?> record = (ConsumerRecord<?, ?>) context.getAttribute(RetryingMessageListenerAdapter.CONTEXT_RECORD);
System.out.println("Recovery callback invoked for record: " + record.value());
Acknowledgment acknowledgment = (Acknowledgment) context.getAttribute(RetryingMessageListenerAdapter.CONTEXT_ACKNOWLEDGMENT);
System.out.println("Recovery callback invoked for record: " + record.value());
if (acknowledgment != null) {
// Manually acknowledge the record
acknowledgment.acknowledge();
}
return null;
};
factory.setRecoveryCallback(recoveryCallback);
// Other configuration options for the factory
return factory;
}
现在使用kafka spring 3.0版本我怎样才能实现同样的事情?
我参考了官方文档,但没有帮助。
查看
DefaultErrorHandler
及其 BackOff
和 ConsumerRecordRecoverer
选项:https://docs.spring.io/spring-kafka/reference/kafka/annotation-error-handling.html#default-eh.
所以,你必须使用
factory.setRetryTemplate(retryTemplate);
而不是
factory.setRecoveryCallback(recoveryCallback);
和
factory.setCommonErrorHandler(new DefaultErrorHandler(...));