使用以下 Spring-Boot 属性
spring:
profiles:
active: "ssl"
kafka:
producer:
client-id: ${SPRING_KAFKA_PRODUCER_CLIENT_ID:kafka-producer}
bootstrap-servers: ${SPRING_KAFKA_PRODUCER_BOOTSTRAP_SERVERS:localhost:9092}
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: io.confluent.kafka.serializers.json.KafkaJsonSchemaSerializer
发布的 JSON 值以空字节开头(十六进制的
00 00 00 00 0C 7B
)。
这会导致无效的 JSON 字符串(例如,当显示在 Offset-Explorer 中时)。
我使用的
@Configuration
类很简单
@Configuration
@EnableAutoConfiguration
@RequiredArgsConstructor
@PropertySources({
@PropertySource("classpath:kafka.yaml"),
@PropertySource("classpath:kafka-ssl.yaml"),
})
public class KafkaProducerConfiguration {
private final KafkaProperties kafkaProperties;
@Bean("producer-factory")
public <T> ProducerFactory<String, T> producerFactory() {
return new DefaultKafkaProducerFactory<>(this.kafkaProperties.getProducer().buildProperties(null));
}
@Bean("kafka-template")
public <T> KafkaTemplate<String, T> kafkaTemplate(ProducerFactory<String, T> producerFactory) {
return new KafkaTemplate<>(producerFactory);
}
}
然后我使用私有的最终 KafkaTemplate
@Service
@RequiredArgsConstructor
public class KafkaProducerServiceImpl<T> implements KafkaProducerService<T> {
private final KafkaTemplate<String, T> kafkaTemplate;
@Override
public void send(String topic, Function<T, String> key, T message) throws KafkaException {
this.kafkaTemplate.send(topic, key.apply(message), message);
}
}
我是否在配置属性中遗漏了某些内容?
这是该序列化程序所期望的。 Confluence 文档
导致无效的 JSON 字符串(例如在 Offset-Explorer 中显示时
Offset Explorer 在其 Registry 集成 中没有 JSONSchema 支持,我上次检查过。 Spring-Kafka 有自己的 JSONSerializer 类,假设您需要 JSON 数据而不使用架构注册表,您可以使用该类来代替 Confluence 中的任何内容。
使用模式注册表将始终发送带有起始空字节的二进制编码数据