Spring-Kafka如何处理多个Kafka Listener和多个Dead Letter Queue?

问题描述 投票:0回答:1

我正在做一个 Spring Boot 2.7.9 项目。该项目有 3 个 Kafka 监听器,配置如下:

@Configuration
@EnableKafka
public class ConsumerKafkaConfig {

    private final KafkaProperties kafkaProperties;

    public ConsumerKafkaConfig(KafkaProperties kafkaProperties) {
        this.kafkaProperties = kafkaProperties;
    }

    @Bean
    public ConsumerFactory<String, Object> firstEventFactory() {
        Map<String, Object> props = kafkaProperties.buildConsumerProperties();
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
        return new DefaultKafkaConsumerFactory<>(props);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, Object> firstContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(firstEventFactory());

        return factory;
    }

    @Bean
    public ConsumerFactory<Object, Object> secondConsumerEventFactory() {
        final Map<String, Object> properties = kafkaProperties.buildConsumerProperties();
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class);
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class);
        properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
        return new DefaultKafkaConsumerFactory<>(properties);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<Object, Object> secondConsumerEventContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<Object, Object> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(secondConsumerEventFactory());

        return factory;
    }

    @Bean
    public ConsumerFactory<String, Object> thirdConsumerEventFactory() {
        final Map<String, Object> properties = kafkaProperties.buildConsumerProperties();
        properties.put(AbstractKafkaSchemaSerDeConfig.VALUE_SUBJECT_NAME_STRATEGY, TopicRecordNameStrategy.class.getName());
        properties.put(AbstractKafkaSchemaSerDeConfig.AUTO_REGISTER_SCHEMAS, false);
        properties.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, true);
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class);
        properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
        return new DefaultKafkaConsumerFactory<>(
                properties
        );
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, Object> thirdConsumerEventContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, Object> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(thirdConsumerEventFactory());
        return factory;
    }

}

对于containerFactories

firstContainerFactory
secondContainerFactory
,如果无法处理消息,我希望将它们发布到特定的死信队列:

  • firstContainerFactory
    将错误发送到名为:
    my-first-dlq
  • 的主题
  • secondContainerFactory
    将错误发送到名为:
    my-second-dlq
  • 的主题

在最新版本的 Spring 中,setErrorHandler() 方法现已弃用并由

setCommonErrorHandler()
取代。但是,我无法找到如何将消息发布到特定的 DLQ。

另一方面,对于

secondContainerFactory
,如果附加了错误,我只想记录错误。

我希望有人能帮助我。

非常感谢:)

spring-boot kafka-consumer-api spring-kafka
1个回答
0
投票

参见

DefaultErrorHandler
及其
ConsumerRecordRecoverer
作为ctor arg。那个可以是
DeadLetterPublishingRecoverer
。你在那里注入
KafkaTemplate
BiFunction<ConsumerRecord<?, ?>, Exception, TopicPartition> destinationResolver
。默认的是这样的:

private static final BiFunction<ConsumerRecord<?, ?>, Exception, TopicPartition>
    DEFAULT_DESTINATION_RESOLVER = (cr, e) -> new TopicPartition(cr.topic() + ".DLT", cr.partition());

所以,您可能可以为自己的函数建模,分别解析为那些

my-first-dlq
my-second-dlq

只要登录就有一个

CommonLoggingErrorHandler
.

在文档中查看更多信息:https://docs.spring.io/spring-kafka/reference/html/#dead-letters

© www.soinside.com 2019 - 2024. All rights reserved.