将值注入自定义 DeadLetterPublishingRecoverer

问题描述 投票:0回答:2

我有一个自定义的死信恢复器,我已经实现了它,以便我可以重写 createProducer 方法

protected ProducerRecord<Object, Object> createProducerRecord(ConsumerRecord<?, ?> record,
            TopicPartition topicPartition, Headers headers, @Nullable byte[] key, @Nullable byte[] value) 

这是必需的,因为我的 DLT 需要与源不同的模式。我需要从我的应用程序环境特定(开发、审查、测试、生产)yaml 中注入 spring @Values 来获取值来创建这个新模式并生成 DLT。

 import org.springframework.beans.factory.annotation.Value

public class MyDeadLetterPublishingRecoverer extends DeadLetterPublishingRecoverer
{
   @Value("${spring.kafka.custom.myValue}")
    private String myValue;

   public MyDeadLetterPublishingRecoverer (
    KafkaOperations<?, ?> template,
    BiFunction<ConsumerRecord<?, ?>, Exception, TopicPartition> destinationResolver) {
  super(template, destinationResolver);
}

}

但是,myValue 始终为 null,因为 DeadLetterPublishingRecoverer 不是组件或类似的构造型,因此没有 spring 上下文来启用变量的解析。如果将其中一个构造型添加到 MyDeadLetterPublishingRecoverer 中,则构造函数会抱怨需要一个 bean,以便可以自动连接目标解析器(无论如何我都不想要) “无法自动装配。未找到‘BiFunction’类型的 beans

因此,我正在寻找一种使 @value 注释在自定义 DeadLetterPublishingRecoverer 中工作的方法,或者一种从 application.yml 中提取值以在自定义 DeadLetterPublishingRecoverer 中使用的替代方法

spring-boot spring-kafka spring-retry
2个回答
0
投票

只需将恢复器定义为

@Bean
,或将其注释为
@Component
,Spring 将处理所有接线。

如果您使用 Spring Boot,只需存在 bean 就足够了,Boot 会将其连接到容器工厂(如果您使用 Boot 的自动配置工厂)。

如果您不使用 boot,则必须自己设置容器工厂。

更正:启动将自动配置包含恢复器的

CommonErrorHandler
,而不是恢复器本身。

如果您不需要目标解析器,请实现更简单的构造函数(仅采用

KafkaOperations
- 模板的构造函数)。


0
投票
package com.vaultspay.common.config.kafka;

import java.util.HashMap;

import org.apache.kafka.common.TopicPartition;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.listener.CommonErrorHandler;
import org.springframework.kafka.listener.DeadLetterPublishingRecoverer;
import org.springframework.kafka.listener.DefaultErrorHandler;
import org.springframework.util.backoff.FixedBackOff;

import com.vaultspay.common.constant.kafka.KafkaConfig;

import lombok.extern.slf4j.Slf4j;

@Configuration
@Slf4j
public class KafkaConsumerConfig {

    @Value("${spring.kafka.max-attempts}")
    private int maxAttempts;
    @Value("${spring.kafka.attempt-interval}")
    private int retryInterval;

    @Value("${spring.application.name}")
    private String applicationName;

    private final KafkaProps kafkaProps;
    private final KafkaTemplate<String, Object> kafkaTemplate;

    KafkaConsumerConfig(KafkaProps kafkaProps, KafkaTemplate<String, Object> kafkaTemplate) {
        this.kafkaProps = kafkaProps;
        this.kafkaTemplate = kafkaTemplate;
    }

    @Bean("kafkaListenerContainerFactory")
    public ConcurrentKafkaListenerContainerFactory<String, Object> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, Object> concurrentKafkaListenerContainerFactory = new ConcurrentKafkaListenerContainerFactory<>();
        concurrentKafkaListenerContainerFactory
                .setConsumerFactory(new DefaultKafkaConsumerFactory<String, Object>(kafkaProps.consumerProps()));
        DeadLetterPublishingRecoverer deadLetterPublishingRecoverer = new DeadLetterPublishingRecoverer(kafkaTemplate,
                (record, ex) -> {
                    if (applicationName != "") {
                        System.out.println("********************************************");
                        System.out.println("CONSUMER EXCEPTiON HANDLER START : " + applicationName);
                        System.out.println("value : " + record.value().toString());
                        System.out.println("offset : " + record.offset());
                        System.out.println("partition : " + record.partition());
                        System.out.println("timestamp : " + record.timestamp());
                        System.out.println("topic : " + record.topic());
                        System.out.println("getMessage : " + ex.getMessage());
                        System.out.println("ex.getClass().getName() : " + ex.getClass().getName());
                        System.out.println("getLocalizedMessage : " + ex.getLocalizedMessage());
                        // System.out.println("key : " + record.key().toString());
                        System.out.println("********************************************");
                        String originalValue = (String) record.value();
                        HashMap<String, Object> newKafkaRecord = new HashMap<String, Object>();
                        newKafkaRecord.put("microserviceName", applicationName);
                        newKafkaRecord.put("record", originalValue);
                        kafkaTemplate.send(KafkaConfig.DEAD_LETTER_TOPIC_NAME, newKafkaRecord);
                        log.info("Exception {} occurred sending the record to the error topic {}", ex.getMessage(),
                                KafkaConfig.DEAD_LETTER_TOPIC_NAME);
                        return new TopicPartition(KafkaConfig.DEAD_LETTER_TOPIC_NAME, -1);
                    } else {
                        return null;

                    }

                });
        CommonErrorHandler errorHandler = new DefaultErrorHandler(deadLetterPublishingRecoverer,
                new FixedBackOff(retryInterval, maxAttempts));
        concurrentKafkaListenerContainerFactory.setCommonErrorHandler(errorHandler);
        return concurrentKafkaListenerContainerFactory;
    }
}
© www.soinside.com 2019 - 2024. All rights reserved.