我正在使用 SpringBoot 2.7.5 遇到了一个问题,@KafkaListener 得到了 MessageConversionException。整个错误日志如下所示:
Bean [example.package.api.kafka.HelloKafkaListener@30e312a2]; nested exception is org.springframework.messaging.converter.MessageConversionException: Cannot handle message; nested exception is org.springframework.messaging.converter.MessageConversionException: Cannot convert from [example.packageapi.ExampleEvent] to [example.packageapi.ExampleEvent] for GenericMessage [payload={"exampleField": "HELLO WORLD"}, headers={kafka_offset=37, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@3bb288b4, kafka_timestampType=CREATE_TIME, kafka_receivedPartitionId=0, kafka_receivedTopic=my-topic, kafka_receivedTimestamp=1678197991282, kafka_groupId=my-topic:HelloKafkaListener}], failedMessage=GenericMessage [payload={"exampleField": "HELLO WORLD"}, headers={kafka_offset=37, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@3bb288b4, kafka_timestampType=CREATE_TIME, kafka_receivedPartitionId=0, kafka_receivedTopic=my-topic, kafka_receivedTimestamp=1678197991282, kafka_groupId=my-topic:HelloKafkaListener}]; nested exception is org.springframework.messaging.converter.MessageConversionException: Cannot handle message; nested exception is org.springframework.messaging.converter.MessageConversionException: Cannot convert from [example.packageapi.ExampleEvent] to [example.packageapi.ExampleEvent] for GenericMessage [payload={"exampleField": "HELLO WORLD"}, headers={kafka_offset=37, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@3bb288b4, kafka_timestampType=CREATE_TIME, kafka_receivedPartitionId=0, kafka_receivedTopic=my-topic, kafka_receivedTimestamp=1678197991282, kafka_groupId=my-topic:HelloKafkaListener}], failedMessage=GenericMessage [payload={"exampleField": "HELLO WORLD"}, headers={kafka_offset=37, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@3bb288b4, kafka_timestampType=CREATE_TIME, kafka_receivedPartitionId=0, kafka_receivedTopic=my-topic, kafka_receivedTimestamp=1678197991282, kafka_groupId=my-topic:HelloKafkaListener}]
消费者配置:
consumer:
group-id: my-topic:HelloKafkaListener
auto-offset-reset: latest
key-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
value-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
properties:
spring.deserializer.key.delegate.class: org.apache.kafka.common.serialization.StringDeserializer
spring.deserializer.value.delegate.class: io.confluent.kafka.serializers.KafkaAvroDeserializer
request.timeout.ms: 5000
我使用的版本是:
<confluent.version>7.3.0</confluent.version>
<avro.version>1.11.1</avro.version>
<spring.kafka.version>2.8.10</spring.kafka.version>
我的 pom.xml 文件:
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-schema-registry-client</artifactId>
<version>${confluent.version}</version>
<exclusions>
<exclusion>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
</exclusion>
<exclusion>
<groupId>io.swagger.core.v3</groupId>
<artifactId>swagger-annotations</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>${avro.version}</version>
</dependency>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-avro-serializer</artifactId>
<version>${confluent.version}</version>
<exclusions>
<exclusion>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-streams-avro-serde</artifactId>
<version>${confluent.version}</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
</exclusion>
</exclusions>
</dependency>
我的event.avsc是这样的:
{
"namespace": "my.name.space",
"type": "record",
"name": "ExampleEvent",
"doc": "A sample event",
"fields": [
{
"name": "exampleField",
"type": "string"
}
]
}
我使用 confluent 工具在终端中发送消息是这样的:
./kafka-avro-console-producer \
--broker-list localhost:59092 --topic my-topic \
--property schema.registry.url=http://kafka-schema:80 \
--property value.schema="$(cat ~/codebase/avro/event.avsc)"
{"exampleField":"HELLO WORLD"}
如果使用
./kafka-avro-console-consumer
获取消息,效果很好。
我的听众是这样的:
public class HelloKafkaListener {
private final HelloKafkaService helloKafkaService;
@Autowired
public HelloKafkaListener(HelloKafkaService helloKafkaService) {
this.helloKafkaService = helloKafkaService;
}
@KafkaListener(
topics = "my-topic",
groupId = "my-topic:HelloKafkaListener"
)
// public void process(ConsumerRecord record) {
// log.info(record.value().toString()); // it works: {"exampleField": "HELLO WORLD"}
// }
public void process(@Payload ExampleEvent event) {
this.helloKafkaService.handleMessage(event);
log.info("Processing event: " + event.getExampleField());
}
}
如果我使用
ConsumerRecord record
,它可以工作我可以看到record.value().toString()
是{"exampleField": "HELLO WORLD"}
,但是切换到ExampleEvent event
,它会抛出异常。任何人都知道为什么会这样?非常感谢!
查看此文章。您必须为您的自定义 Java 对象提供消息转换器
ExampleEvent
。 @Payload
本身只能接收 String
消息。
尝试
@Bean
public ConsumerFactory<String, Greeting> exampleConsumerFactory() {
return new DefaultKafkaConsumerFactory<>(
props,
new StringDeserializer(),
new JsonDeserializer<>(ExampleEvent.class));
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, ExampleEvent>
exampleKafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, ExampleEvent> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(exampleConsumerFactory());
return factory;
}
然后通过
注册 @KafkaListener(
topics = "my-topic",
groupId = "my-topic:HelloKafkaListener",
containerFactory = "exampleKafkaListenerContainerFactory" //<-------
)
public void process(ExampleEvent event) {
this.helloKafkaService.handleMessage(event);
log.info("Processing event: " + event.getExampleField());
}
还要确保在你的依赖项中你已经有以下内容,否则手动添加它
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>