Spring boot kafka - 如何告诉 JsonDeserializer 忽略类型标头?

问题描述 投票:0回答:2

Spring 的 Kafka 生产者将类型标头嵌入到消息中,该标头指定消费者应将消息反序列化到哪个类。当生产者不使用 Spring Kafka 而消费者使用时,这是一个问题。在这种情况下,JsonDeserializer 无法反序列化消息并会抛出异常“标头中没有类型信息并且没有提供默认类型”。
解决此问题的一种方法是设置默认的反序列化类型。这在单个主题包含多个消息架构的情况下不起作用。 我发现的另一个解决方案是设置

spring.kafka.consumer.properties.spring.json.use.type.headers

为 false(在 application.properties 文件中)。这不会执行任何操作,因为再次抛出相同的异常。
如何确保 JsonDeserializer 忽略类型标头?

json spring-boot apache-kafka spring-kafka
2个回答
1
投票

查看该解串器的此选项:

/**
 * Set to false to ignore type information in headers and use the configured
 * target type instead.
 * Only applies if the preconfigured type mapper is used.
 * Default true.
 * @param useTypeHeaders false to ignore type headers.
 * @since 2.2.8
 */
public void setUseTypeHeaders(boolean useTypeHeaders) {

可以通过属性配置为:

/**
 * Kafka config property for using type headers (default true).
 * @since 2.2.3
 */
public static final String USE_TYPE_INFO_HEADERS = "spring.json.use.type.headers";

在这种情况下,逻辑将是这样的:

this.typeMapper.setTypePrecedence(this.useTypeHeaders ? TypePrecedence.TYPE_ID : TypePrecedence.INFERRED);

这意味着反序列化的类型是从侦听器方法推断出来的。

在文档中查看更多信息:https://docs.spring.io/spring-kafka/reference/html/#json-serde


0
投票

这个 ConsumerConfig 为我工作的详细信息可以在这里找到 - https://docs.spring.io/spring-kafka/reference/kafka/serdes.html

@Configuration
public class KafkaConsumerConfig {

    @Bean
    public Map<String, Object> consumerConfig() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
        props.put(ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS, JsonDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
        props.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class.getName());
        props.put(JsonDeserializer.TRUSTED_PACKAGES, "*");
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        //props.put(JsonDeserializer.KEY_DEFAULT_TYPE, "com.example.MyKey");
        //props.put(JsonDeserializer.VALUE_DEFAULT_TYPE, "com.example.MyValue");
        return props;
    }

    @Bean
    public ConsumerFactory<String,Object> consumerFactory(){
        return new DefaultKafkaConsumerFactory<>(consumerConfig());
    }

    @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, Object>> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }
}
© www.soinside.com 2019 - 2024. All rights reserved.