我们可以在Spring Boot中使用多个kafka模板吗?

问题描述 投票:0回答:3

在我的 spring boot kafka 发布者应用程序中,我想提供对以 String(json) 或字节格式发布消息的支持,因为我想同时提供对 json 和 avro 的支持。但是 Spring Boot 中的 Kafka 模板让我们只定义其中一个模板。有没有办法同时使用模板或任何其他方式来提供对 json 和 avro 的支持?

KafkaTemplate<String, String>
仅适用于字符串,但我也想发布avro,它应该类似于
KafkaTemplate<String, byte[]>

apache-kafka spring-kafka kafka-producer-api
3个回答
13
投票

您可以尝试使用不同的配置创建 KafkaTemplate:

@Bean
public ProducerFactory<String, String> producerFactoryString() {
    Map<String, Object> configProps = new HashMap<>();
    //additional config parameters .... 
    configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    return new DefaultKafkaProducerFactory<>(configProps);
}

@Bean
public ProducerFactory<String, byte[]> producerFactoryByte() {
    Map<String, Object> configProps = new HashMap<>();
    //additional config parameters ....
    configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, BytesSerializer.class);
    return new DefaultKafkaProducerFactory<>(configProps);
}

@Bean
public KafkaTemplate<String, String> kafkaTemplateString() {
    return new KafkaTemplate<>(producerFactoryString());
}

@Bean
public KafkaTemplate<String, byte[]> kafkaTemplateByte() {
    return new KafkaTemplate<>(producerFactoryByte());
}

0
投票

您可以创建 Kafka 配置。我必须将数据发送到 2 个不同的服务器。

@Configuration
public class KafkaConfig {

    private final MosaicKafkaConfig mosaicKafkaConfig;
    private final StreamKafkaConfig streamKafkaConfig;

    public KafkaConfig(MosaicKafkaConfig mosaicKafkaConfig, StreamKafkaConfig streamKafkaConfig) {
        this.mosaicKafkaConfig = mosaicKafkaConfig;
        this.streamKafkaConfig = streamKafkaConfig;
    }

    @Bean
    public ProducerFactory<?, ?> kafkaProducerFactoryForMosaic() {
        KafkaProperties kafkaProperties = new KafkaProperties();
        KafkaProperties.Ssl ssl = kafkaProperties.getSsl();
        ResourceLoader resourceLoader = new DefaultResourceLoader();
        Resource resource = resourceLoader.getResource(mosaicKafkaConfig.getSslTrustStoreLocation());
        ssl.setTrustStoreLocation(resource);
        ssl.setTrustStorePassword(mosaicKafkaConfig.getSslTrustStorePassword());
        ssl.setTrustStoreType(mosaicKafkaConfig.getSslTrustStoreType());
        Map<String, String> props = kafkaProperties.getProperties();
        props.put("sasl.jaas.config", mosaicKafkaConfig.getSaslConfig());
        props.put("sasl.mechanism", mosaicKafkaConfig.getSaslMechanism());
        props.put("security.protocol", mosaicKafkaConfig.getSaslSecProtocol());

        kafkaProperties.getProducer().setValueSerializer(mosaicKafkaConfig.getValaueSerializer());
        kafkaProperties.getProducer().setClientId(mosaicKafkaConfig.getClientID());
        kafkaProperties.getProducer().setBootstrapServers(mosaicKafkaConfig.getBootstrapServers());

        Map<String, Object> configProps = kafkaProperties.buildProducerProperties();
        return new DefaultKafkaProducerFactory<>(configProps);

    }

    @Bean
    public KafkaTemplate<?, ?> kafkaTemplateForMosaic(ProducerFactory<Object, Object> kafkaProducerFactoryForMosaic) {
        KafkaTemplate<Object, Object> kafkaTemplate = new KafkaTemplate<>(kafkaProducerFactoryForMosaic);
        return kafkaTemplate;
    }


    @Bean
    public ProducerFactory<?, ?> kafkaProducerFactoryForStream() {
        KafkaProperties kafkaProperties = new KafkaProperties();
        KafkaProperties.Ssl ssl = kafkaProperties.getSsl();
        ResourceLoader resourceLoader = new DefaultResourceLoader();
        Resource resource = resourceLoader.getResource(streamKafkaConfig.getSslTrustStoreLocation());
        ssl.setTrustStoreLocation(resource);
        ssl.setTrustStorePassword(streamKafkaConfig.getSslTrustStorePassword());
        ssl.setTrustStoreType(streamKafkaConfig.getSslTrustStoreType());
        Map<String, String> props = kafkaProperties.getProperties();
        props.put("sasl.jaas.config", streamKafkaConfig.getSaslConfig());
        props.put("sasl.mechanism", streamKafkaConfig.getSaslMechanism());
        props.put("security.protocol", streamKafkaConfig.getSaslSecProtocol());

        kafkaProperties.getProducer().setValueSerializer(streamKafkaConfig.getValaueSerializer());
        kafkaProperties.getProducer().setClientId(streamKafkaConfig.getClientID());
        kafkaProperties.getProducer().setBootstrapServers(streamKafkaConfig.getBootstrapServers());

        Map<String, Object> configProps = kafkaProperties.buildProducerProperties();
        return new DefaultKafkaProducerFactory<>(configProps);

    }

    @Bean
    public KafkaTemplate<?, ?> kafkaTemplateForStream(ProducerFactory<Object, Object> kafkaProducerFactoryForStream) {
        KafkaTemplate<Object, Object> kafkaTemplate = new KafkaTemplate<>(kafkaProducerFactoryForStream);
        return kafkaTemplate;
    }
}

0
投票

只是扩展@i.bondarenko 的答案 如果您想从属性文件配置所有属性,那么您可以编写自己的自定义 Kafka 配置来支持多个生产者配置,如下所示:-

kafka:
  producer:
    bootstrap-servers: localhost:9092,localhost:9093,localhost:9094
    string-producer:
      retries: 0
      acks: all
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
    byte-producer:
      retries: 0
      acks: all
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.BytesSerializer

从类文件中读取配置:-

@Configuration
@ConfigurationProperties(prefix = "kafka")
@Getter
@Setter
public class KafkaCustomProperties {
  private List<String> bootstrapServers = new ArrayList<>(Collections.singletonList("localhost:9092"));
  private String clientId;
  private Map<String, String> properties = new HashMap<>();
  private Map<String, KafkaProperties.Producer> producer;
  private Map<String, KafkaProperties.Consumer> consumer;
  private KafkaProperties.Ssl ssl = new KafkaProperties.Ssl();
  private KafkaProperties.Security security = new KafkaProperties.Security();

  public Map<String, Object> buildCommonProperties() {
    Map<String, Object> properties = new HashMap<>();
    if (this.bootstrapServers != null) {
      properties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, this.bootstrapServers);
    }
    if (this.clientId != null) {
      properties.put(CommonClientConfigs.CLIENT_ID_CONFIG, this.clientId);
    }
    properties.putAll(this.ssl.buildProperties());
    properties.putAll(this.security.buildProperties());
    if (!CollectionUtils.isEmpty(this.properties)) {
      properties.putAll(this.properties);
    }
    return properties;
  }
}

使用此配置使用

KafkaTemplate
注释为每个生产者生成
@Qualifier
beans

@Configuration @RequiredArgsConstructor @Slf4j public class KafkaMultipleProducerConfig { private final KafkaCustomProperties kafkaCustomProperties; @Bean @Qualifier("producer-string") public KafkaTemplate<String, Object> producerStringKafkaTemplate() { return new KafkaTemplate<>(producerFactory("producer1")); } @Bean @Qualifier("producer-byte") public KafkaTemplate<String, Object> producerByteKafkaTemplate() { return new KafkaTemplate<>(producerFactory("producer2")); } private ProducerFactory<String, Object> producerFactory(String producerName) { Map<String, Object> properties = new HashMap<>(kafkaCustomProperties.buildCommonProperties()); if (nonNull(kafkaCustomProperties.getProducer())) { KafkaProperties.Producer producerProperties = kafkaCustomProperties.getProducer().get(producerName); if (nonNull(producerProperties)) { properties.putAll(producerProperties.buildProperties()); } } log.info("Kafka Producer '{}' properties: {}", producerName, properties); return new DefaultKafkaProducerFactory<>(properties); } }
并使用这些 

KafkaTemplate

 beans 将消息发布到不同的生产者配置。

参考帖子

https://codingnconcepts.com/spring-boot/configure-multiple-kafka- Producer/了解详细解释。

© www.soinside.com 2019 - 2024. All rights reserved.