我们有一个 API,可以连接到多个 kafka 主题来生成和使用 kafka 消息。我们使用 spring-kafka,除了主题名称(可以在运行时为消费者和生产者提供)之外,我们还有每个主题不同的另一个属性(用于序列化/反序列化 kafka 消息的加密密钥),这会引发以下问题:我们不能为所有主题重用相同的 kafka 生产者/kafka 消费者配置。
我们最初的方法是:
@Configuration
@EnableKafka
@RequiredArgsConstructor
public class KafkaProducerConfig {
private final KafkaProperties properties;
@Bean(name = TOPIC_1_PRODUCER)
public KafkaTemplate<String, GenericRecord> kafkaTemplateTopic1() {
Map<String, Object> producerProps = properties.buildProducerProperties();
producerProps.put(ENCRYPTION_KEY, "encryption_key_for_topic_1");
return new KafkaTemplate<>(new DefaultKafkaProducerFactory<>(producerProps));
}
@Bean(name = TOPIC_2_PRODUCER)
public KafkaTemplate<String, GenericRecord> kafkaTemplateTopic2() {
Map<String, Object> producerProps = properties.buildProducerProperties();
producerProps.put(ENCRYPTION_KEY, "encryption_key_for_topic_2");
return new KafkaTemplate<>(new DefaultKafkaProducerFactory<>(producerProps));
}
@Bean(name = TOPIC_3_PRODUCER)
public KafkaTemplate<String, GenericRecord> kafkaTemplateTopic3() {
Map<String, Object> producerProps = properties.buildProducerProperties();
producerProps.put(ENCRYPTION_KEY, "encryption_key_for_topic_3");
return new KafkaTemplate<>(new DefaultKafkaProducerFactory<>(producerProps));
}
}
通过这种方法,我们需要为每个需要生成消息的新主题提供一个新的 KafkaTemplate bean,因此基本上每个主题一个 kafka 模板和关联的 kafkaTemplate bean 将被注入到旨在为该特定的 kafka 消息生成的服务中主题。
第二种方法是仅使用一个 KafkaTemplate bean(一个 kafka 生产者)并在运行时提供加密密钥。对于这个,我查看了是否有可能在运行时向 spring-kafka 中的生产者和消费者提供配置详细信息?并提供了以下实现:
@Configuration
@EnableKafka
@RequiredArgsConstructor
public class KafkaProducerConfig {
private final KafkaProperties properties;
@Bean
public ProducerFactory<String, GenericRecord> producerFactory() {
Map<String, Object> producerProps = properties.buildProducerProperties();
return new DefaultKafkaProducerFactory<>(producerProps);
}
}
出版商服务:
@Slf4j
@Service
@RequiredArgsConstructor
public class KafkaProducer {
private final ProducerFactory<String, GenericRecord> producerFactory;
private final TopicResolver topicResolver;
public void sendMessage(GenericRecord genericRecord) {
TopicInfo topicInfo = topicResolver.resolve(genericRecord);
Map<String, Object> configOverrides = new HashMap<>();
configOverrides.put(ENCRYPTION_KEY, "encryption_key_for_resolved_topic");
KafkaTemplate<String, GenericRecord> kafkaTemplate = new KafkaTemplate<>(producerFactory, configOverrides);
ProducerRecord<String, GenericRecord> producerRecord = new ProducerRecord<>(topicInfo.getTopicName(), genericRecord);
ListenableFuture<SendResult<String, GenericRecord>> listenableFuture = kafkaTemplate.send(producerRecord);
listenableFuture.addCallback(new KafkaSendCallback<String, GenericRecord>() {
@Override
public void onSuccess(SendResult<String, GenericRecord> result) {
log.info("Kafka message successfully sent");
}
@Override
public void onFailure(@NonNull KafkaProducerException ex) {
log.error("Kafka message failed to be sent with error {}", ex.getMessage());
}
});
}
}
通过使用这种方法,将为每条生成的消息创建一个新的 KafkaTemplate 实例,我想这不是框架的目的,我不知道这会如何影响 kafka 性能整体。
您对所提出的两种方法有何看法?是否有任何其他可能的解决方案可以在使用一个 KakaTemplate bean/一个 kafka 生产者时在运行时提供配置?
消费者端我们也有同样的需求。
您可以使用配置有
KafkaTemplate
的单个 DelegatingByTopicSerializer
。
https://docs.spring.io/spring-kafka/docs/current/reference/html/#by-topic
@Bean
public ProducerFactory<Integer, Object> producerFactory(Map<String, Object> config) {
return new DefaultKafkaProducerFactory<>(config,
null,
new DelegatingByTopicSerializer(Map.of(
Pattern.compile("topic[0-4]"), new ByteArraySerializer(),
Pattern.compile("topic[5-9]"), new StringSerializer())),
new JsonSerializer<Object>()); // default
}