我有一个自定义的死信恢复器,我已经实现了它,以便我可以重写 createProducer 方法
protected ProducerRecord<Object, Object> createProducerRecord(ConsumerRecord<?, ?> record,
TopicPartition topicPartition, Headers headers, @Nullable byte[] key, @Nullable byte[] value)
这是必需的,因为我的 DLT 需要与源不同的模式。我需要从我的应用程序环境特定(开发、审查、测试、生产)yaml 中注入 spring @Values 来获取值来创建这个新模式并生成 DLT。
import org.springframework.beans.factory.annotation.Value
public class MyDeadLetterPublishingRecoverer extends DeadLetterPublishingRecoverer
{
@Value("${spring.kafka.custom.myValue}")
private String myValue;
public MyDeadLetterPublishingRecoverer (
KafkaOperations<?, ?> template,
BiFunction<ConsumerRecord<?, ?>, Exception, TopicPartition> destinationResolver) {
super(template, destinationResolver);
}
}
但是,myValue 始终为 null,因为 DeadLetterPublishingRecoverer 不是组件或类似的构造型,因此没有 spring 上下文来启用变量的解析。如果将其中一个构造型添加到 MyDeadLetterPublishingRecoverer 中,则构造函数会抱怨需要一个 bean,以便可以自动连接目标解析器(无论如何我都不想要)
“无法自动装配。未找到‘BiFunction
因此,我正在寻找一种使 @value 注释在自定义 DeadLetterPublishingRecoverer 中工作的方法,或者一种从 application.yml 中提取值以在自定义 DeadLetterPublishingRecoverer 中使用的替代方法
只需将恢复器定义为
@Bean
,或将其注释为 @Component
,Spring 将处理所有接线。
如果您使用 Spring Boot,只需存在 bean 就足够了,Boot 会将其连接到容器工厂(如果您使用 Boot 的自动配置工厂)。
如果您不使用 boot,则必须自己设置容器工厂。
更正:启动将自动配置包含恢复器的
CommonErrorHandler
,而不是恢复器本身。
如果您不需要目标解析器,请实现更简单的构造函数(仅采用
KafkaOperations
- 模板的构造函数)。
package com.vaultspay.common.config.kafka;
import java.util.HashMap;
import org.apache.kafka.common.TopicPartition;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.listener.CommonErrorHandler;
import org.springframework.kafka.listener.DeadLetterPublishingRecoverer;
import org.springframework.kafka.listener.DefaultErrorHandler;
import org.springframework.util.backoff.FixedBackOff;
import com.vaultspay.common.constant.kafka.KafkaConfig;
import lombok.extern.slf4j.Slf4j;
@Configuration
@Slf4j
public class KafkaConsumerConfig {
@Value("${spring.kafka.max-attempts}")
private int maxAttempts;
@Value("${spring.kafka.attempt-interval}")
private int retryInterval;
@Value("${spring.application.name}")
private String applicationName;
private final KafkaProps kafkaProps;
private final KafkaTemplate<String, Object> kafkaTemplate;
KafkaConsumerConfig(KafkaProps kafkaProps, KafkaTemplate<String, Object> kafkaTemplate) {
this.kafkaProps = kafkaProps;
this.kafkaTemplate = kafkaTemplate;
}
@Bean("kafkaListenerContainerFactory")
public ConcurrentKafkaListenerContainerFactory<String, Object> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Object> concurrentKafkaListenerContainerFactory = new ConcurrentKafkaListenerContainerFactory<>();
concurrentKafkaListenerContainerFactory
.setConsumerFactory(new DefaultKafkaConsumerFactory<String, Object>(kafkaProps.consumerProps()));
DeadLetterPublishingRecoverer deadLetterPublishingRecoverer = new DeadLetterPublishingRecoverer(kafkaTemplate,
(record, ex) -> {
if (applicationName != "") {
System.out.println("********************************************");
System.out.println("CONSUMER EXCEPTiON HANDLER START : " + applicationName);
System.out.println("value : " + record.value().toString());
System.out.println("offset : " + record.offset());
System.out.println("partition : " + record.partition());
System.out.println("timestamp : " + record.timestamp());
System.out.println("topic : " + record.topic());
System.out.println("getMessage : " + ex.getMessage());
System.out.println("ex.getClass().getName() : " + ex.getClass().getName());
System.out.println("getLocalizedMessage : " + ex.getLocalizedMessage());
// System.out.println("key : " + record.key().toString());
System.out.println("********************************************");
String originalValue = (String) record.value();
HashMap<String, Object> newKafkaRecord = new HashMap<String, Object>();
newKafkaRecord.put("microserviceName", applicationName);
newKafkaRecord.put("record", originalValue);
kafkaTemplate.send(KafkaConfig.DEAD_LETTER_TOPIC_NAME, newKafkaRecord);
log.info("Exception {} occurred sending the record to the error topic {}", ex.getMessage(),
KafkaConfig.DEAD_LETTER_TOPIC_NAME);
return new TopicPartition(KafkaConfig.DEAD_LETTER_TOPIC_NAME, -1);
} else {
return null;
}
});
CommonErrorHandler errorHandler = new DefaultErrorHandler(deadLetterPublishingRecoverer,
new FixedBackOff(retryInterval, maxAttempts));
concurrentKafkaListenerContainerFactory.setCommonErrorHandler(errorHandler);
return concurrentKafkaListenerContainerFactory;
}
}