我正在做一个 Spring Boot 2.7.9 项目。该项目有 3 个 Kafka 监听器,配置如下:
@Configuration
@EnableKafka
public class ConsumerKafkaConfig {
private final KafkaProperties kafkaProperties;
public ConsumerKafkaConfig(KafkaProperties kafkaProperties) {
this.kafkaProperties = kafkaProperties;
}
@Bean
public ConsumerFactory<String, Object> firstEventFactory() {
Map<String, Object> props = kafkaProperties.buildConsumerProperties();
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
return new DefaultKafkaConsumerFactory<>(props);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Object> firstContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(firstEventFactory());
return factory;
}
@Bean
public ConsumerFactory<Object, Object> secondConsumerEventFactory() {
final Map<String, Object> properties = kafkaProperties.buildConsumerProperties();
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class);
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class);
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
return new DefaultKafkaConsumerFactory<>(properties);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<Object, Object> secondConsumerEventContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Object, Object> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(secondConsumerEventFactory());
return factory;
}
@Bean
public ConsumerFactory<String, Object> thirdConsumerEventFactory() {
final Map<String, Object> properties = kafkaProperties.buildConsumerProperties();
properties.put(AbstractKafkaSchemaSerDeConfig.VALUE_SUBJECT_NAME_STRATEGY, TopicRecordNameStrategy.class.getName());
properties.put(AbstractKafkaSchemaSerDeConfig.AUTO_REGISTER_SCHEMAS, false);
properties.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, true);
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class);
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
return new DefaultKafkaConsumerFactory<>(
properties
);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Object> thirdConsumerEventContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Object> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(thirdConsumerEventFactory());
return factory;
}
}
对于containerFactories
firstContainerFactory
和secondContainerFactory
,如果无法处理消息,我希望将它们发布到特定的死信队列:
firstContainerFactory
将错误发送到名为:my-first-dlq
secondContainerFactory
将错误发送到名为:my-second-dlq
在最新版本的 Spring 中,setErrorHandler() 方法现已弃用并由
setCommonErrorHandler()
取代。但是,我无法找到如何将消息发布到特定的 DLQ。
另一方面,对于
secondContainerFactory
,如果附加了错误,我只想记录错误。
我希望有人能帮助我。
非常感谢:)
参见
DefaultErrorHandler
及其ConsumerRecordRecoverer
作为ctor arg。那个可以是DeadLetterPublishingRecoverer
。你在那里注入 KafkaTemplate
和 BiFunction<ConsumerRecord<?, ?>, Exception, TopicPartition> destinationResolver
。默认的是这样的:
private static final BiFunction<ConsumerRecord<?, ?>, Exception, TopicPartition>
DEFAULT_DESTINATION_RESOLVER = (cr, e) -> new TopicPartition(cr.topic() + ".DLT", cr.partition());
所以,您可能可以为自己的函数建模,分别解析为那些
my-first-dlq
和 my-second-dlq
。
只要登录就有一个
CommonLoggingErrorHandler
.
在文档中查看更多信息:https://docs.spring.io/spring-kafka/reference/html/#dead-letters