我有一个用例,要求我有多个 Kafka Producer(基于配置)。即,如果我的配置有 3 个想要接收数据的租户,我想启动 3 个生产者(所有三个都写入 3 个不同的集群)。
我尝试将 Kafka 配置为:
@Bean
@Primary
public Map<String, KafkaTemplate<String, String>> kafkaTemplates() {
log.info("setting up kafka templates");
final String ids = configuration.getIds();
Map<String, KafkaTemplate<String, String>> kafkaTemplates = new HashMap<>();
for (String id : ids.split(",")) {
kafkaTemplates.put(id, kafkaTemplate(id));
}
return kafkaTemplates;
}
private KafkaTemplate<String, String> kafkaTemplate(String id) {
log.info("setting up kafka template");
try {
Properties producerProperties = new Properties();
try (InputStream kins = Files.newInputStream(new File("/opt/user-secrets/", id + ".properties").toPath())) {
producerProperties.load(kins);
} catch (IOException e) {
throw new RuntimeException(e);
}
Map props = producerProperties;
Map<String, Object> props1 = (Map<String, Object>) props;
ProducerFactory<String, String> producerFactory = new DefaultKafkaProducerFactory<>(props1);
return new KafkaTemplate<>(producerFactory);
} catch (Exception e) {
log.error("e {}", e.getMessage(), e);
throw new RuntimeException(e);
}
}
当我尝试访问生产者服务中的 KafkaTemplate 时,这会导致
kafkaTemplates.get(id)
上出现 NPE:
@Service
@Slf4j
@Getter
@Setter
public class KafkaProducerService {
private final Map<String, KafkaTemplate<String, String>> kafkaTemplates;
private Schema avroSchema;
public KafkaProducerService(Map<String, KafkaTemplate<String, String>> kafkaTemplates) {
this.kafkaTemplates = kafkaTemplates;
}
public void produce(String id, String topic, String key, VehicleHeartbeat message) {
GenericRecord record = generateAvroRecord(message);
log.info("generic record: {}", record);
kafkaTemplates.get(id).send(topic, key, record.toString());
}
如何使用 Spring Kafka 做到这一点?
可能与其他一些
Map
bean 发生冲突。
您可以尝试将
@Bean
替换为 kafkaTemplates
方法,如下所示:
@Bean("myKafkaTemplatesMap")
@Primary
public Map<String, KafkaTemplate<String, String>> kafkaTemplates() {
...
}
然后在服务构造函数中使用相同的名称对其进行限定,这样可以确保注入正确的 bean:
public KafkaProducerService(@Qualifier("myKafkaTemplatesMap") Map<String, KafkaTemplate<String, String>> kafkaTemplates) {
this.kafkaTemplates = kafkaTemplates;
}