在我的 spring boot kafka 发布者应用程序中,我想提供对以 String(json) 或字节格式发布消息的支持,因为我想同时提供对 json 和 avro 的支持。但是 Spring Boot 中的 Kafka 模板让我们只定义其中一个模板。有没有办法同时使用模板或任何其他方式来提供对 json 和 avro 的支持?
KafkaTemplate<String, String>
仅适用于字符串,但我也想发布avro,它应该类似于KafkaTemplate<String, byte[]>
您可以尝试使用不同的配置创建 KafkaTemplate:
@Bean
public ProducerFactory<String, String> producerFactoryString() {
Map<String, Object> configProps = new HashMap<>();
//additional config parameters ....
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return new DefaultKafkaProducerFactory<>(configProps);
}
@Bean
public ProducerFactory<String, byte[]> producerFactoryByte() {
Map<String, Object> configProps = new HashMap<>();
//additional config parameters ....
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, BytesSerializer.class);
return new DefaultKafkaProducerFactory<>(configProps);
}
@Bean
public KafkaTemplate<String, String> kafkaTemplateString() {
return new KafkaTemplate<>(producerFactoryString());
}
@Bean
public KafkaTemplate<String, byte[]> kafkaTemplateByte() {
return new KafkaTemplate<>(producerFactoryByte());
}
您可以创建 Kafka 配置。我必须将数据发送到 2 个不同的服务器。
@Configuration
public class KafkaConfig {
private final MosaicKafkaConfig mosaicKafkaConfig;
private final StreamKafkaConfig streamKafkaConfig;
public KafkaConfig(MosaicKafkaConfig mosaicKafkaConfig, StreamKafkaConfig streamKafkaConfig) {
this.mosaicKafkaConfig = mosaicKafkaConfig;
this.streamKafkaConfig = streamKafkaConfig;
}
@Bean
public ProducerFactory<?, ?> kafkaProducerFactoryForMosaic() {
KafkaProperties kafkaProperties = new KafkaProperties();
KafkaProperties.Ssl ssl = kafkaProperties.getSsl();
ResourceLoader resourceLoader = new DefaultResourceLoader();
Resource resource = resourceLoader.getResource(mosaicKafkaConfig.getSslTrustStoreLocation());
ssl.setTrustStoreLocation(resource);
ssl.setTrustStorePassword(mosaicKafkaConfig.getSslTrustStorePassword());
ssl.setTrustStoreType(mosaicKafkaConfig.getSslTrustStoreType());
Map<String, String> props = kafkaProperties.getProperties();
props.put("sasl.jaas.config", mosaicKafkaConfig.getSaslConfig());
props.put("sasl.mechanism", mosaicKafkaConfig.getSaslMechanism());
props.put("security.protocol", mosaicKafkaConfig.getSaslSecProtocol());
kafkaProperties.getProducer().setValueSerializer(mosaicKafkaConfig.getValaueSerializer());
kafkaProperties.getProducer().setClientId(mosaicKafkaConfig.getClientID());
kafkaProperties.getProducer().setBootstrapServers(mosaicKafkaConfig.getBootstrapServers());
Map<String, Object> configProps = kafkaProperties.buildProducerProperties();
return new DefaultKafkaProducerFactory<>(configProps);
}
@Bean
public KafkaTemplate<?, ?> kafkaTemplateForMosaic(ProducerFactory<Object, Object> kafkaProducerFactoryForMosaic) {
KafkaTemplate<Object, Object> kafkaTemplate = new KafkaTemplate<>(kafkaProducerFactoryForMosaic);
return kafkaTemplate;
}
@Bean
public ProducerFactory<?, ?> kafkaProducerFactoryForStream() {
KafkaProperties kafkaProperties = new KafkaProperties();
KafkaProperties.Ssl ssl = kafkaProperties.getSsl();
ResourceLoader resourceLoader = new DefaultResourceLoader();
Resource resource = resourceLoader.getResource(streamKafkaConfig.getSslTrustStoreLocation());
ssl.setTrustStoreLocation(resource);
ssl.setTrustStorePassword(streamKafkaConfig.getSslTrustStorePassword());
ssl.setTrustStoreType(streamKafkaConfig.getSslTrustStoreType());
Map<String, String> props = kafkaProperties.getProperties();
props.put("sasl.jaas.config", streamKafkaConfig.getSaslConfig());
props.put("sasl.mechanism", streamKafkaConfig.getSaslMechanism());
props.put("security.protocol", streamKafkaConfig.getSaslSecProtocol());
kafkaProperties.getProducer().setValueSerializer(streamKafkaConfig.getValaueSerializer());
kafkaProperties.getProducer().setClientId(streamKafkaConfig.getClientID());
kafkaProperties.getProducer().setBootstrapServers(streamKafkaConfig.getBootstrapServers());
Map<String, Object> configProps = kafkaProperties.buildProducerProperties();
return new DefaultKafkaProducerFactory<>(configProps);
}
@Bean
public KafkaTemplate<?, ?> kafkaTemplateForStream(ProducerFactory<Object, Object> kafkaProducerFactoryForStream) {
KafkaTemplate<Object, Object> kafkaTemplate = new KafkaTemplate<>(kafkaProducerFactoryForStream);
return kafkaTemplate;
}
}
只是扩展@i.bondarenko 的答案 如果您想从属性文件配置所有属性,那么您可以编写自己的自定义 Kafka 配置来支持多个生产者配置,如下所示:-
kafka:
producer:
bootstrap-servers: localhost:9092,localhost:9093,localhost:9094
string-producer:
retries: 0
acks: all
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
byte-producer:
retries: 0
acks: all
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.BytesSerializer
从类文件中读取配置:-
@Configuration
@ConfigurationProperties(prefix = "kafka")
@Getter
@Setter
public class KafkaCustomProperties {
private List<String> bootstrapServers = new ArrayList<>(Collections.singletonList("localhost:9092"));
private String clientId;
private Map<String, String> properties = new HashMap<>();
private Map<String, KafkaProperties.Producer> producer;
private Map<String, KafkaProperties.Consumer> consumer;
private KafkaProperties.Ssl ssl = new KafkaProperties.Ssl();
private KafkaProperties.Security security = new KafkaProperties.Security();
public Map<String, Object> buildCommonProperties() {
Map<String, Object> properties = new HashMap<>();
if (this.bootstrapServers != null) {
properties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, this.bootstrapServers);
}
if (this.clientId != null) {
properties.put(CommonClientConfigs.CLIENT_ID_CONFIG, this.clientId);
}
properties.putAll(this.ssl.buildProperties());
properties.putAll(this.security.buildProperties());
if (!CollectionUtils.isEmpty(this.properties)) {
properties.putAll(this.properties);
}
return properties;
}
}
使用此配置使用
KafkaTemplate
注释为每个生产者生成 @Qualifier
beans
@Configuration
@RequiredArgsConstructor
@Slf4j
public class KafkaMultipleProducerConfig {
private final KafkaCustomProperties kafkaCustomProperties;
@Bean
@Qualifier("producer-string")
public KafkaTemplate<String, Object> producerStringKafkaTemplate() {
return new KafkaTemplate<>(producerFactory("producer1"));
}
@Bean
@Qualifier("producer-byte")
public KafkaTemplate<String, Object> producerByteKafkaTemplate() {
return new KafkaTemplate<>(producerFactory("producer2"));
}
private ProducerFactory<String, Object> producerFactory(String producerName) {
Map<String, Object> properties = new HashMap<>(kafkaCustomProperties.buildCommonProperties());
if (nonNull(kafkaCustomProperties.getProducer())) {
KafkaProperties.Producer producerProperties = kafkaCustomProperties.getProducer().get(producerName);
if (nonNull(producerProperties)) {
properties.putAll(producerProperties.buildProperties());
}
}
log.info("Kafka Producer '{}' properties: {}", producerName, properties);
return new DefaultKafkaProducerFactory<>(properties);
}
}
并使用这些 KafkaTemplate
beans 将消息发布到不同的生产者配置。参考帖子
https://codingnconcepts.com/spring-boot/configure-multiple-kafka- Producer/了解详细解释。