我正在使用spring retry(spring-kafka.version 2.9.13)为kafka消费者实现重试机制。我的目标是使用不是由 spring-kafka 自动创建的自定义重试主题(或多个主题)。以下是单个主题的代码片段:
@RetryableTopic(尝试=“3”,fixedDelayTopicStrategy = FixDelayStrategy.SINGLE_TOPIC,include = {Exception.class})
@KafkaListener(主题=“$ {kafka-config.consumer.core-topic}”,groupId =“$ {kafka-config.consumer.group-id}”,containerFactory =“kafkaConcurrentListenerContainerFactory”)
公共无效 receiveMessage(MyPOJO
我正在配置类中扩展 RetryTopicConfigurationSupport ,并按照 spring-retry
的答案中提到的自定义主题命名策略的步骤进行操作但是,当我运行我的 springboot 项目时,它失败并出现以下异常:
原因:java.lang.IllegalStateException:在上下文中找不到单个KafkaTemplate bean;必须存在一个实例,或者一个专门命名的 defaultRetryTopicKafkaTemplate 在 org.springframework.util.Assert.state(Assert.java:97) 在org.springframework.kafka.annotation.RetryableTopicAnnotationProcessor.getKafkaTemplate(RetryableTopicAnnotationProcessor.java:220) 在org.springframework.kafka.annotation.RetryableTopicAnnotationProcessor.processAnnotation(RetryableTopicAnnotationProcessor.java:149) 在 org.springframework.kafka.annotation.RetryTopicConfigurationProvider.findRetryConfigurationFor(RetryTopicConfigurationProvider.java:94) 在org.springframework.kafka.annotation.KafkaListenerAnnotationBeanPostProcessor.processMainAndRetryListeners(KafkaListenerAnnotationBeanPostProcessor.java:509) 在org.springframework.kafka.annotation.KafkaListenerAnnotationBeanPostProcessor.processKafkaListener(KafkaListenerAnnotationBeanPostProcessor.java:488) 在org.springframework.kafka.annotation.KafkaListenerAnnotationBeanPostProcessor.postProcessAfterInitialization(KafkaListenerAnnotationBeanPostProcessor.java:389) 在org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.applyBeanPostProcessorsAfterInitialization(AbstractAutowireCapableBeanFactory.java:455) 在org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:1808) 在 org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:620) ...省略20个常用框架
我错过了什么吗?
谢谢!
如果您没有指定
名称,则会查找名称为kafkaTemplate
的 bean。如果没有找到 bean,则会抛出异常。defaultRetryTopicKafkaTemplate
Spring Boot 会自动为我们配置一个带有
KafkaTemplate
名称的 kafkaTemplate
bean。
所以,如果你不创建
defaultRetryTopicKafkaTemplate
bean,那么你需要根据 Spring Boot 自动配置显式指定它:
@RetryableTopic(kafkaTemplate = "kafkaTemplate")