嵌入式Kafka作为主题产生价值

问题描述 投票:0回答:1

也许你可以帮助我并告诉我在设置 EmbeddedKafka 时错过了什么

所以我在项目中使用EmbeddedKafka来测试我的@KafkaListener:

  1. 我创建了简单的测试
@SpringBootTest
@EmbeddedKafka(/*partitions = 1, ??? */ topics = {"topic-1", "topic-2"})
public class KafkaTest {...}
  1. 然后我初始化Producer
...
  private final EmbeddedKafkaBroker embeddedKafkaBroker;
  private Producer<String, KafkaDto> producer;

  public void prepare() {
    var props = KafkaTestUtils.producerProps(embeddedKafkaBroker);
    var cf = new DefaultKafkaProducerFactory<>(props, new StringSerializer(), kafkaDtoSerializer);
    producer = cf.createProducer();
  }
...
  1. 然后在我的测试中将测试消息发送到 KafkaListener。到目前为止,一切似乎都正常,没有任何问题。
    var kafkaDto = ...;
    return new ProducerRecord<>(kafkaConsumerProperties.getTopic(), "", kafkaDto);
  1. 我的消息来了
  @KafkaListener(topics = "#{'${spring.kafka.consumer.topic}'}",
                 groupId = "#{'${spring.kafka.consumer.group-id}'}",
                 containerFactory = "kafkaListenerContainerFactory")
  public void message(@Payload KafkaDto message) {...}
  1. 这就是 KafkaDto 反序列化器出现问题的地方。

Kafka配置

  @Bean
  public ConsumerFactory<String, KafkaDto> consumerFactoryDebezium(KafkaConsumerProperties kafkaConsumerProperties) {
    Map<String, Object> properties = new HashMap<>(kafkaProperties.buildConsumerProperties());
    properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);     properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,KafkaDtoDeserializer.class);
    properties.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaConsumerProperties.getGroupId());
    properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, kafkaConsumerProperties.isEnableAutoCommit());

    return new DefaultKafkaConsumerFactory<>(properties, new StringDeserializer(),
        new ErrorHandlingDeserializer<>(kafkaDtoDeserializer));
  }

KafkaDto反序列化器

@Slf4j
@Component
@RequiredArgsConstructor
public class KafkaDtoDeserializer implements Deserializer<KafkaDto> {

  private final ObjectMapper mapper;

  @Override
  public KafkaDto deserialize(String topic, byte[] data) {}
}

由于某种原因,byte[]数据包含与字符串主题相同的值(字节中的“topic-1”)。而不是序列化的 KafkaDto 值。

我尝试运行应用程序并连接到真正的 Kafka。监听器工作正常。 由于某种原因,问题出在 EmbeddedKafka 上。 告诉我我错过了什么?

spring-boot spring-kafka spring-test embedded-kafka
1个回答
0
投票

谢谢@artem-bilan 你是对的,我序列化了主题而不是数据

   @Override
   public byte[] serialize(String topic, KafkaDto data) {
     try {
       return mapper.writeValueAsBytes(topic); <- My mistake
...

© www.soinside.com 2019 - 2024. All rights reserved.