无法为Spring KafkaTemplate指定valueSerializer实例

问题描述 投票:0回答:1

我需要将自定义值序列化器设置为Spring的KafkaTemplate。值序列化器如下所示:JsonSerializer<JourneyMailExchange> serializer = new JsonSerializer<>(customObjectMapper);(例如,为了应用PropertyNamingStrategy.SNAKE_CASE

可以使用KafkaProducer轻松完成:

new KafkaProducer<>(properties, Serdes.String().serializer(), valueSerializer);

但我没有找到与KafkaTemplate相同的可能性。我只看到props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);ProducerFactory属性,但它不是我正在寻找的(不可能提供特定的实例)。

我们通过以下方式创建KafkaTemplate:

@Bean
public ProducerFactory<String, Object> producerFactory(KafkaProperties kafkaProperties) {
    return new DefaultKafkaProducerFactory<>(producerConfigs(kafkaProperties.getDefaultSettings()));
}

private static Map<String, Object> producerConfigs(Map<String, String> defaultSettings) {
    Map<String, Object> props = new HashMap<>(defaultSettings);
    props.put(JsonSerializer.ADD_TYPE_INFO_HEADERS, false);
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
    return props;
}

@Bean
public KafkaTemplate<String, Object> kafkaTemplate(ProducerFactory<String, Object> producerFactory) {
    KafkaTemplate<String, Object> kafkaTemplate = new KafkaTemplate<>(producerFactory);
    kafkaTemplate.setDefaultTopic("topicName");
    return kafkaTemplate;
}

还发现了以下方法:

kafkaTemplate.setMessageConverter(new StringJsonMessageConverter(customObjectMapper));

但是只有当我们使用Spring的Message实例并且如果我们使用键和值调用send方法时忽略提供的转换,它才应用转换。

spring-kafka版本:2.1.0.RELEASE

java spring apache-kafka spring-kafka
1个回答
1
投票

最后我想通过在构造函数中提供值序列化器来配置多个DefaultKafkaProducerFactory可以做到这一点:

public DefaultKafkaProducerFactory(Map<String, Object> configs, Serializer<K> keySerializer,
        Serializer<V> valueSerializer)

特定值序列化程序的最终版本如下所示(已配置的ProducerFactory的数量与所需的不同值序列化程序的数量相同):

@Bean
public ProducerFactory<String, Object> producerFactoryWithSnakeCaseValueSerializer(KafkaProperties kafkaProperties) {
    Map<String, Object> props = producerConfigsWithSnakeCaseValueSerializer(kafkaProperties.getDefaultSettings());
    JsonSerializer<Object> valueSerializer = new JsonSerializer<>(createObjectMapper(SNAKE_CASE));
    valueSerializer.setAddTypeInfo(false);
    return new DefaultKafkaProducerFactory<>(props, Serdes.String().serializer(), valueSerializer);
}

private static Map<String, Object> producerConfigsWithSnakeCaseValueSerializer(Map<String, String> defaultSettings) {
    Map<String, Object> props = new HashMap<>(defaultSettings);
    ...
    return props;
}

@Bean
public KafkaTemplate<String, Object> kafkaTemplate(ProducerFactory<String, Object> producerConfigsWithSnakeCaseValueSerializer) {
    KafkaTemplate<String, Object> kafkaTemplate = new KafkaTemplate<>(producerConfigsWithSnakeCaseValueSerializer);
    kafkaTemplate.setDefaultTopic("topicName");
    return kafkaTemplate;
}
© www.soinside.com 2019 - 2024. All rights reserved.