SpringBoot @KafkaListener 得到 MessageConversionException:无法从 A 转换为 B

问题描述 投票:0回答:1

我正在使用 SpringBoot 2.7.5 遇到了一个问题,@KafkaListener 得到了 MessageConversionException。整个错误日志如下所示:

Bean [example.package.api.kafka.HelloKafkaListener@30e312a2]; nested exception is org.springframework.messaging.converter.MessageConversionException: Cannot handle message; nested exception is org.springframework.messaging.converter.MessageConversionException: Cannot convert from [example.packageapi.ExampleEvent] to [example.packageapi.ExampleEvent] for GenericMessage [payload={"exampleField": "HELLO WORLD"}, headers={kafka_offset=37, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@3bb288b4, kafka_timestampType=CREATE_TIME, kafka_receivedPartitionId=0, kafka_receivedTopic=my-topic, kafka_receivedTimestamp=1678197991282, kafka_groupId=my-topic:HelloKafkaListener}], failedMessage=GenericMessage [payload={"exampleField": "HELLO WORLD"}, headers={kafka_offset=37, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@3bb288b4, kafka_timestampType=CREATE_TIME, kafka_receivedPartitionId=0, kafka_receivedTopic=my-topic, kafka_receivedTimestamp=1678197991282, kafka_groupId=my-topic:HelloKafkaListener}]; nested exception is org.springframework.messaging.converter.MessageConversionException: Cannot handle message; nested exception is org.springframework.messaging.converter.MessageConversionException: Cannot convert from [example.packageapi.ExampleEvent] to [example.packageapi.ExampleEvent] for GenericMessage [payload={"exampleField": "HELLO WORLD"}, headers={kafka_offset=37, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@3bb288b4, kafka_timestampType=CREATE_TIME, kafka_receivedPartitionId=0, kafka_receivedTopic=my-topic, kafka_receivedTimestamp=1678197991282, kafka_groupId=my-topic:HelloKafkaListener}], failedMessage=GenericMessage [payload={"exampleField": "HELLO WORLD"}, headers={kafka_offset=37, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@3bb288b4, kafka_timestampType=CREATE_TIME, kafka_receivedPartitionId=0, kafka_receivedTopic=my-topic, kafka_receivedTimestamp=1678197991282, kafka_groupId=my-topic:HelloKafkaListener}]

消费者配置:

consumer:
  group-id: my-topic:HelloKafkaListener
  auto-offset-reset: latest
  key-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
  value-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
  properties:
    spring.deserializer.key.delegate.class: org.apache.kafka.common.serialization.StringDeserializer
    spring.deserializer.value.delegate.class: io.confluent.kafka.serializers.KafkaAvroDeserializer
    request.timeout.ms: 5000

我使用的版本是:

<confluent.version>7.3.0</confluent.version>
<avro.version>1.11.1</avro.version> 
<spring.kafka.version>2.8.10</spring.kafka.version>

我的 pom.xml 文件:

<dependency>
    <groupId>io.confluent</groupId>
    <artifactId>kafka-schema-registry-client</artifactId>
    <version>${confluent.version}</version>
    <exclusions>
        <exclusion>
            <groupId>org.apache.avro</groupId>
            <artifactId>avro</artifactId>
        </exclusion>
        <exclusion>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
        </exclusion>
        <exclusion>
            <groupId>io.swagger.core.v3</groupId>
            <artifactId>swagger-annotations</artifactId>
        </exclusion>
    </exclusions>
</dependency>
<dependency>
    <groupId>org.apache.avro</groupId>
    <artifactId>avro</artifactId>
    <version>${avro.version}</version>
</dependency>
<dependency>
    <groupId>io.confluent</groupId>
    <artifactId>kafka-avro-serializer</artifactId>
    <version>${confluent.version}</version>
    <exclusions>
        <exclusion>
            <groupId>org.apache.avro</groupId>
            <artifactId>avro</artifactId>
        </exclusion>
    </exclusions>
</dependency>
<dependency>
    <groupId>io.confluent</groupId>
    <artifactId>kafka-streams-avro-serde</artifactId>
    <version>${confluent.version}</version>
    <exclusions>
        <exclusion>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
        </exclusion>
        <exclusion>
            <groupId>org.apache.avro</groupId>
            <artifactId>avro</artifactId>
        </exclusion>
    </exclusions>
</dependency>

我的event.avsc是这样的:

{
  "namespace": "my.name.space",
  "type": "record",
  "name": "ExampleEvent",
  "doc": "A sample event",
  "fields": [
    {
      "name": "exampleField",
      "type": "string"
    }
  ]
}

我使用 confluent 工具在终端中发送消息是这样的:

    ./kafka-avro-console-producer \
    --broker-list localhost:59092 --topic my-topic \
    --property schema.registry.url=http://kafka-schema:80 \
    --property value.schema="$(cat ~/codebase/avro/event.avsc)"
    {"exampleField":"HELLO WORLD"}

如果使用

./kafka-avro-console-consumer
获取消息,效果很好。

我的听众是这样的:

public class HelloKafkaListener {
  private final HelloKafkaService helloKafkaService;
  
  @Autowired
  public HelloKafkaListener(HelloKafkaService helloKafkaService) {
    this.helloKafkaService = helloKafkaService;
  }
  
  @KafkaListener(
      topics = "my-topic",
      groupId = "my-topic:HelloKafkaListener"
  )
//  public void process(ConsumerRecord record) {
//    log.info(record.value().toString()); // it works: {"exampleField": "HELLO WORLD"}
//  }
  public void process(@Payload ExampleEvent event) {
    this.helloKafkaService.handleMessage(event);
    log.info("Processing event: " + event.getExampleField());
  }
}

如果我使用

ConsumerRecord record
,它可以工作我可以看到
record.value().toString()
{"exampleField": "HELLO WORLD"}
,但是切换到
ExampleEvent event
,它会抛出异常。任何人都知道为什么会这样?非常感谢!

spring-boot apache-kafka avro
1个回答
0
投票

查看此文章。您必须为您的自定义 Java 对象提供消息转换器

ExampleEvent
@Payload
本身只能接收
String
消息。

尝试

@Bean
public ConsumerFactory<String, Greeting> exampleConsumerFactory() {
    return new DefaultKafkaConsumerFactory<>(
      props,
      new StringDeserializer(), 
      new JsonDeserializer<>(ExampleEvent.class));
}

@Bean
public ConcurrentKafkaListenerContainerFactory<String, ExampleEvent> 
  exampleKafkaListenerContainerFactory() {

    ConcurrentKafkaListenerContainerFactory<String, ExampleEvent> factory =
      new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(exampleConsumerFactory());
    return factory;
}

然后通过

注册
      @KafkaListener(
      topics = "my-topic",
      groupId = "my-topic:HelloKafkaListener",
      containerFactory = "exampleKafkaListenerContainerFactory"  //<-------
  )
  public void process(ExampleEvent event) {
     this.helloKafkaService.handleMessage(event);
     log.info("Processing event: " + event.getExampleField());
 }

还要确保在你的依赖项中你已经有以下内容,否则手动添加它

<dependency>
    <groupId>com.fasterxml.jackson.core</groupId>
    <artifactId>jackson-databind</artifactId>
</dependency>
© www.soinside.com 2019 - 2024. All rights reserved.