将 ObjectMapper 注入 Spring Kafka 序列化器/反序列化器

问题描述 投票:0回答:3

我正在使用 Spring Kafka 1.1.2-RELEASE 和 Spring Boot 1.5.0 RC,并且我配置了一个扩展

org.springframework.kafka.support.serializer.JsonSerializer
/
org.springframework.kafka.support.serializer.JsonDeserializer
的自定义值序列化器/反序列化器类。这些类确实使用 Jackson
ObjectMapper
,可以通过构造函数提供。

是否有可能从我的 Spring 上下文中注入

ObjectMapper
?我已经配置了一个
ObjectMapper
,我想在序列化器/解串器中重用它。

java spring spring-boot spring-kafka
3个回答
15
投票

您可以将

JsonSerializer
JsonDeserializer
配置为
@Bean
。 向它们注入所需的
ObjectMapper
。并在
DefaultKafkaProducerFactory
DefaultKafkaConsumerFactory
bean 定义中使用这些 bean:

    @Bean
    public ProducerFactory<Integer, String> producerFactory() {
        DefaultKafkaProducerFactory<Integer, String> producerFactory = 
                new DefaultKafkaProducerFactory<>(producerConfigs());
        producerFactory.setValueSerializer(jsonSerializer());
        return producerFactory;
    }

5
投票
@Component
public class ObjectMapperProducerFactoryCustomizer implements DefaultKafkaProducerFactoryCustomizer {

    private final ObjectMapper objectMapper;

    public ObjectMapperProducerFactoryCustomizer(ObjectMapper objectMapper) {
        this.objectMapper = objectMapper;
    }

    @Override
    public void customize(DefaultKafkaProducerFactory<?, ?> producerFactory) {
        if (Objects.nonNull(producerFactory)) {
            producerFactory.setValueSerializer(new JsonSerializer<>(objectMapper));
        }
    }

}

1
投票

对于反应式非阻塞 kafka 客户端,配置如下:

@Configuration
@EnableConfigurationProperties(KafkaProperties.class)
public class KafkaConfig {

    @Bean
    public ReactiveKafkaProducerTemplate<String, Object> kafkaProducerTemplate(KafkaProperties p,
                                                                               ObjectMapper objectMapper) {
        return new ReactiveKafkaProducerTemplate<>(
                SenderOptions.<String, Object>create(p.buildProducerProperties())
                        .withValueSerializer(new JsonSerializer<>(objectMapper)));
    }
}

以上暗示您正在使用

io.projectreactor.kafka
dep,

dependencies {
    compile "io.projectreactor.kafka:reactor-kafka"
}
© www.soinside.com 2019 - 2024. All rights reserved.