Spring Boot 中的多个 Kafka 生产者

问题描述 投票:0回答:1

我有一个用例,要求我有多个 Kafka Producer(基于配置)。即,如果我的配置有 3 个想要接收数据的租户,我想启动 3 个生产者(所有三个都写入 3 个不同的集群)。

我尝试将 Kafka 配置为:


  @Bean
  @Primary
  public Map<String, KafkaTemplate<String, String>> kafkaTemplates() {
    log.info("setting up kafka templates");
    final String ids = configuration.getIds();
    Map<String, KafkaTemplate<String, String>> kafkaTemplates = new HashMap<>();

    for (String id : ids.split(",")) {
      kafkaTemplates.put(id, kafkaTemplate(id));
    }
    return kafkaTemplates;
  }

  private KafkaTemplate<String, String> kafkaTemplate(String id) {
    log.info("setting up kafka template");
    try {
      Properties producerProperties = new Properties();
      try (InputStream kins = Files.newInputStream(new File("/opt/user-secrets/", id + ".properties").toPath())) {
        producerProperties.load(kins);
      } catch (IOException e) {
        throw new RuntimeException(e);
      }
      Map props = producerProperties;
      Map<String, Object> props1 = (Map<String, Object>) props;
      ProducerFactory<String, String> producerFactory = new DefaultKafkaProducerFactory<>(props1);
      return new KafkaTemplate<>(producerFactory);
    } catch (Exception e) {
      log.error("e {}", e.getMessage(), e);
      throw new RuntimeException(e);
    }
  }

当我尝试访问生产者服务中的 KafkaTemplate 时,这会导致

kafkaTemplates.get(id)
上出现 NPE:

@Service
@Slf4j
@Getter
@Setter
public class KafkaProducerService {


  private final Map<String, KafkaTemplate<String, String>> kafkaTemplates;

  private Schema avroSchema;

  public KafkaProducerService(Map<String, KafkaTemplate<String, String>> kafkaTemplates) {
    this.kafkaTemplates = kafkaTemplates;
  }

  public void produce(String id, String topic, String key, VehicleHeartbeat message) {
    GenericRecord record = generateAvroRecord(message);
    log.info("generic record: {}", record);
    kafkaTemplates.get(id).send(topic, key, record.toString());
  }

如何使用 Spring Kafka 做到这一点?

java spring spring-boot spring-kafka
1个回答
0
投票

可能与其他一些

Map
bean 发生冲突。

您可以尝试将

@Bean
替换为
kafkaTemplates
方法,如下所示:

@Bean("myKafkaTemplatesMap")
@Primary
public Map<String, KafkaTemplate<String, String>> kafkaTemplates() {
    ...
}

然后在服务构造函数中使用相同的名称对其进行限定,这样可以确保注入正确的 bean:

public KafkaProducerService(@Qualifier("myKafkaTemplatesMap") Map<String, KafkaTemplate<String, String>> kafkaTemplates) {
    this.kafkaTemplates = kafkaTemplates;
}
© www.soinside.com 2019 - 2024. All rights reserved.