也许你可以帮助我并告诉我在设置 EmbeddedKafka 时错过了什么
所以我在项目中使用EmbeddedKafka来测试我的@KafkaListener:
@SpringBootTest
@EmbeddedKafka(/*partitions = 1, ??? */ topics = {"topic-1", "topic-2"})
public class KafkaTest {...}
...
private final EmbeddedKafkaBroker embeddedKafkaBroker;
private Producer<String, KafkaDto> producer;
public void prepare() {
var props = KafkaTestUtils.producerProps(embeddedKafkaBroker);
var cf = new DefaultKafkaProducerFactory<>(props, new StringSerializer(), kafkaDtoSerializer);
producer = cf.createProducer();
}
...
var kafkaDto = ...;
return new ProducerRecord<>(kafkaConsumerProperties.getTopic(), "", kafkaDto);
@KafkaListener(topics = "#{'${spring.kafka.consumer.topic}'}",
groupId = "#{'${spring.kafka.consumer.group-id}'}",
containerFactory = "kafkaListenerContainerFactory")
public void message(@Payload KafkaDto message) {...}
Kafka配置
@Bean
public ConsumerFactory<String, KafkaDto> consumerFactoryDebezium(KafkaConsumerProperties kafkaConsumerProperties) {
Map<String, Object> properties = new HashMap<>(kafkaProperties.buildConsumerProperties());
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,KafkaDtoDeserializer.class);
properties.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaConsumerProperties.getGroupId());
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, kafkaConsumerProperties.isEnableAutoCommit());
return new DefaultKafkaConsumerFactory<>(properties, new StringDeserializer(),
new ErrorHandlingDeserializer<>(kafkaDtoDeserializer));
}
KafkaDto反序列化器
@Slf4j
@Component
@RequiredArgsConstructor
public class KafkaDtoDeserializer implements Deserializer<KafkaDto> {
private final ObjectMapper mapper;
@Override
public KafkaDto deserialize(String topic, byte[] data) {}
}
由于某种原因,byte[]数据包含与字符串主题相同的值(字节中的“topic-1”)。而不是序列化的 KafkaDto 值。
我尝试运行应用程序并连接到真正的 Kafka。监听器工作正常。 由于某种原因,问题出在 EmbeddedKafka 上。 告诉我我错过了什么?
谢谢@artem-bilan 你是对的,我序列化了主题而不是数据
@Override
public byte[] serialize(String topic, KafkaDto data) {
try {
return mapper.writeValueAsBytes(topic); <- My mistake
...