使用 spring-kafka 实现多个具有不同配置细节的 kafka 生产者和消费者

问题描述 投票:0回答:1

我们有一个 API,可以连接到多个 kafka 主题来生成和使用 kafka 消息。我们使用 spring-kafka,除了主题名称(可以在运行时为消费者和生产者提供)之外,我们还有每个主题不同的另一个属性(用于序列化/反序列化 kafka 消息的加密密钥),这会引发以下问题:我们不能为所有主题重用相同的 kafka 生产者/kafka 消费者配置。

我们最初的方法是:

@Configuration
@EnableKafka
@RequiredArgsConstructor
public class KafkaProducerConfig {

    private final KafkaProperties properties;

    @Bean(name = TOPIC_1_PRODUCER)
    public KafkaTemplate<String, GenericRecord> kafkaTemplateTopic1() {
        Map<String, Object> producerProps = properties.buildProducerProperties();
        producerProps.put(ENCRYPTION_KEY, "encryption_key_for_topic_1");
        return new KafkaTemplate<>(new DefaultKafkaProducerFactory<>(producerProps));
    }

    @Bean(name = TOPIC_2_PRODUCER)
    public KafkaTemplate<String, GenericRecord> kafkaTemplateTopic2() {
        Map<String, Object> producerProps = properties.buildProducerProperties();
        producerProps.put(ENCRYPTION_KEY, "encryption_key_for_topic_2");
        return new KafkaTemplate<>(new DefaultKafkaProducerFactory<>(producerProps));
    }

    @Bean(name = TOPIC_3_PRODUCER)
    public KafkaTemplate<String, GenericRecord> kafkaTemplateTopic3() {
        Map<String, Object> producerProps = properties.buildProducerProperties();
        producerProps.put(ENCRYPTION_KEY, "encryption_key_for_topic_3");
        return new KafkaTemplate<>(new DefaultKafkaProducerFactory<>(producerProps));
    }
}

通过这种方法,我们需要为每个需要生成消息的新主题提供一个新的 KafkaTemplate bean,因此基本上每个主题一个 kafka 模板和关联的 kafkaTemplate bean 将被注入到旨在为该特定的 kafka 消息生成的服务中主题。

第二种方法是仅使用一个 KafkaTemplate bean(一个 kafka 生产者)并在运行时提供加密密钥。对于这个,我查看了是否有可能在运行时向 spring-kafka 中的生产者和消费者提供配置详细信息?并提供了以下实现:

@Configuration
@EnableKafka
@RequiredArgsConstructor
public class KafkaProducerConfig {

    private final KafkaProperties properties;

    @Bean
    public ProducerFactory<String, GenericRecord> producerFactory() {
        Map<String, Object> producerProps = properties.buildProducerProperties();
        return new DefaultKafkaProducerFactory<>(producerProps);
    }
    
}

出版商服务:


@Slf4j
@Service
@RequiredArgsConstructor
public class KafkaProducer {

    private final ProducerFactory<String, GenericRecord> producerFactory;
    private final TopicResolver topicResolver;

    public void sendMessage(GenericRecord genericRecord) {
        TopicInfo topicInfo = topicResolver.resolve(genericRecord);

        Map<String, Object> configOverrides = new HashMap<>();
        configOverrides.put(ENCRYPTION_KEY, "encryption_key_for_resolved_topic");
        KafkaTemplate<String, GenericRecord> kafkaTemplate = new KafkaTemplate<>(producerFactory, configOverrides);

        ProducerRecord<String, GenericRecord> producerRecord = new ProducerRecord<>(topicInfo.getTopicName(), genericRecord);
        ListenableFuture<SendResult<String, GenericRecord>> listenableFuture = kafkaTemplate.send(producerRecord);
        listenableFuture.addCallback(new KafkaSendCallback<String, GenericRecord>() {
            @Override
            public void onSuccess(SendResult<String, GenericRecord> result) {
                log.info("Kafka message successfully sent");
            }

            @Override
            public void onFailure(@NonNull KafkaProducerException ex) {
                log.error("Kafka message failed to be sent with error {}", ex.getMessage());
            }
        });
    }
}

通过使用这种方法,将为每条生成的消息创建一个新的 KafkaTemplate 实例,我想这不是框架的目的,我不知道这会如何影响 kafka 性能整体。

您对所提出的两种方法有何看法?是否有任何其他可能的解决方案可以在使用一个 KakaTemplate bean/一个 kafka 生产者时在运行时提供配置?

消费者端我们也有同样的需求。

java spring-boot apache-kafka spring-kafka
1个回答
0
投票

您可以使用配置有

KafkaTemplate
的单个
DelegatingByTopicSerializer

https://docs.spring.io/spring-kafka/docs/current/reference/html/#by-topic

@Bean
public ProducerFactory<Integer, Object> producerFactory(Map<String, Object> config) {
    return new DefaultKafkaProducerFactory<>(config,
            null,
            new DelegatingByTopicSerializer(Map.of(
                    Pattern.compile("topic[0-4]"), new ByteArraySerializer(),
                    Pattern.compile("topic[5-9]"), new StringSerializer())),
                    new JsonSerializer<Object>());  // default
}
© www.soinside.com 2019 - 2024. All rights reserved.