使用 Spring Retry 向固定主题发送 kafka 可重试消息

问题描述 投票:0回答:1

我正在使用spring retry(spring-kafka.version 2.9.13)为kafka消费者实现重试机制。我的目标是使用不是由 spring-kafka 自动创建的自定义重试主题(或多个主题)。以下是单个主题的代码片段:

@RetryableTopic(尝试=“3”,fixedDelayTopicStrategy = FixDelayStrategy.SINGLE_TOPIC,include = {Exception.class}) @KafkaListener(主题=“$ {kafka-config.consumer.core-topic}”,groupId =“$ {kafka-config.consumer.group-id}”,containerFactory =“kafkaConcurrentListenerContainerFactory”) 公共无效 receiveMessage(MyPOJO coreKafkaMessage, 确认 ack) 抛出 IOException { log.info("kafka 测试重试", coreKafkaMessage); throw new Exception("测试重试"); }

我正在配置类中扩展 RetryTopicConfigurationSupport ,并按照 spring-retry

的答案中提到的自定义主题命名策略的步骤进行操作

但是,当我运行我的 springboot 项目时,它失败并出现以下异常:

原因:java.lang.IllegalStateException:在上下文中找不到单个KafkaTemplate bean;必须存在一个实例,或者一个专门命名的 defaultRetryTopicKafkaTemplate 在 org.springframework.util.Assert.state(Assert.java:97) 在org.springframework.kafka.annotation.RetryableTopicAnnotationProcessor.getKafkaTemplate(RetryableTopicAnnotationProcessor.java:220) 在org.springframework.kafka.annotation.RetryableTopicAnnotationProcessor.processAnnotation(RetryableTopicAnnotationProcessor.java:149) 在 org.springframework.kafka.annotation.RetryTopicConfigurationProvider.findRetryConfigurationFor(RetryTopicConfigurationProvider.java:94) 在org.springframework.kafka.annotation.KafkaListenerAnnotationBeanPostProcessor.processMainAndRetryListeners(KafkaListenerAnnotationBeanPostProcessor.java:509) 在org.springframework.kafka.annotation.KafkaListenerAnnotationBeanPostProcessor.processKafkaListener(KafkaListenerAnnotationBeanPostProcessor.java:488) 在org.springframework.kafka.annotation.KafkaListenerAnnotationBeanPostProcessor.postProcessAfterInitialization(KafkaListenerAnnotationBeanPostProcessor.java:389) 在org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.applyBeanPostProcessorsAfterInitialization(AbstractAutowireCapableBeanFactory.java:455) 在org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:1808) 在 org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:620) ...省略20个常用框架

我错过了什么吗?

谢谢!

spring-boot apache-kafka spring-kafka
1个回答
0
投票

参见文档:https://docs.spring.io/spring-kafka/reference/retrytopic/retry-config.html#using-the-retryabletopic-annotation

如果您没有指定

kafkaTemplate
名称,则会查找名称为
defaultRetryTopicKafkaTemplate
的 bean。如果没有找到 bean,则会抛出异常。

Spring Boot 会自动为我们配置一个带有

KafkaTemplate
名称的
kafkaTemplate
bean。

所以,如果你不创建

defaultRetryTopicKafkaTemplate
bean,那么你需要根据 Spring Boot 自动配置显式指定它:

@RetryableTopic(kafkaTemplate = "kafkaTemplate")
© www.soinside.com 2019 - 2024. All rights reserved.