根据配置更改 Kafka 生产者类型的模式

问题描述 投票:0回答:1

我正在尝试进行一些配置,可以生成特定类型、字节数组与 JSON 等的 Kafka 消息输出。

目前,我有一个 Kafka 生产者,例如:

 @Bean
    public KafkaSender<String, SomePojo> kafkaSender() {
        final Map<String, Object> properties = new HashMap<>();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServerVariable);
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaJsonSerializer.class);
        final SenderOptions<String, SomePojo> senderOptions = SenderOptions.create(properties);
        return KafkaSender.create(senderOptions);
    }

这样就可以正常工作,即我们可以在目标 Kafka 主题中看到一个表示 SomePojo 的 JSON 的字符串。

现在,在另一个主题中,相同的 POJO 需要以字节为单位发送。

不知道该怎么做,我们复制了整个工作,只是改变了这一点:

    @Bean
    public KafkaSender<String, byte[]> kafkaSender() {
        final Map<String, Object> properties = new HashMap<>();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServerVariable);
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
        final SenderOptions<String, byte[]> senderOptions = SenderOptions.create(properties);
        return KafkaSender.create(senderOptions);
    }

注意 SomePojo 与 byte[] 和 KafkaJsonSerializer 与 ByteArraySerializer 这也可以正常工作,现在将 Kafka 中的消息视为字节数组。

这很乏味,而且我相信这不是正确的方法(想象一下它不仅是字节数组与 JSON,还有图中的其他输出类型)。

有没有一种方法可以保留一个 bean,但使其可配置?

比如(编造的):

    @Bean
    public KafkaSender<String, {retrieve from some configuration}> kafkaSender() {
        final Map<String, Object> properties = new HashMap<>();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServerVariable);
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, {retrieve from some configuration});
        final SenderOptions<String, {retrieve from some configuration}> senderOptions = SenderOptions.create(properties);
        return KafkaSender.create(senderOptions);
    }
java apache-kafka spring-kafka reactor-kafka
1个回答
0
投票

对于每种唯一的对象类型或唯一的生产者配置,您将需要一个序列化器。

是的,你可以使用ByteArraySerialixer,然后在外部调用你自己的序列化函数,例如Jackson ObjectMapper writeBytes

同样,您可以使用 JsonNode 代替 POJO,它将接受任何有效的 JSON

© www.soinside.com 2019 - 2024. All rights reserved.