Spring 的 Kafka 生产者将类型标头嵌入到消息中,该标头指定消费者应将消息反序列化到哪个类。当生产者不使用 Spring Kafka 而消费者使用时,这是一个问题。在这种情况下,JsonDeserializer 无法反序列化消息并会抛出异常“标头中没有类型信息并且没有提供默认类型”。
解决此问题的一种方法是设置默认的反序列化类型。这在单个主题包含多个消息架构的情况下不起作用。
我发现的另一个解决方案是设置
spring.kafka.consumer.properties.spring.json.use.type.headers
为 false(在 application.properties 文件中)。这不会执行任何操作,因为再次抛出相同的异常。
如何确保 JsonDeserializer 忽略类型标头?
查看该解串器的此选项:
/**
* Set to false to ignore type information in headers and use the configured
* target type instead.
* Only applies if the preconfigured type mapper is used.
* Default true.
* @param useTypeHeaders false to ignore type headers.
* @since 2.2.8
*/
public void setUseTypeHeaders(boolean useTypeHeaders) {
可以通过属性配置为:
/**
* Kafka config property for using type headers (default true).
* @since 2.2.3
*/
public static final String USE_TYPE_INFO_HEADERS = "spring.json.use.type.headers";
在这种情况下,逻辑将是这样的:
this.typeMapper.setTypePrecedence(this.useTypeHeaders ? TypePrecedence.TYPE_ID : TypePrecedence.INFERRED);
这意味着反序列化的类型是从侦听器方法推断出来的。
在文档中查看更多信息:https://docs.spring.io/spring-kafka/reference/html/#json-serde
这个 ConsumerConfig 为我工作的详细信息可以在这里找到 - https://docs.spring.io/spring-kafka/reference/kafka/serdes.html
@Configuration
public class KafkaConsumerConfig {
@Bean
public Map<String, Object> consumerConfig() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
props.put(ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS, JsonDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
props.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class.getName());
props.put(JsonDeserializer.TRUSTED_PACKAGES, "*");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
//props.put(JsonDeserializer.KEY_DEFAULT_TYPE, "com.example.MyKey");
//props.put(JsonDeserializer.VALUE_DEFAULT_TYPE, "com.example.MyValue");
return props;
}
@Bean
public ConsumerFactory<String,Object> consumerFactory(){
return new DefaultKafkaConsumerFactory<>(consumerConfig());
}
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, Object>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
}