使用kafka-avro-console-consumer消耗spring spring stream kafka发送的avro消息时出错

问题描述 投票:0回答:1

我尝试了官方schema-registry-confluent示例(消费者/制作人)与我本地安装的汇合4.0.0,它可以发送“传感器”avro消息发送帖子请求和收听时,但当我使用kafka-avro-console-consumer工具随附汇编4.0.0查看发送的avro消息,该工具引发了以下错误(a)。我还尝试使用kafka-avro-console-producer工具发送的avro消息(并且消息可以在kafka-avro-console-consumer工具中正确显示),它报告了以下错误(b)。给定的示例是否支持汇合4.0.0?非常感谢!

(a)kafka-avro-console-consumer error =>

ERROR运行使用者时出现未知错误:(kafka.tools.ConsoleConsumer $:107)org.apache.kafka.common.errors.SerializationException:错误反序列化ID为1的Avro消息引起:org.apache.kafka.common.errors。 SerializationException:未知的魔术字节!

(b)听众错误=>

org.springframework.messaging.converter.MessageConversionException:无法读取JSON:在#1字节#7处无效的UTF-32字符0x51473863(在0x0010ffff之上);嵌套异常是java.io.CharConversionException:org.springframework.messaging.converter.MappingJackson2MessageConverter.convertFromInternal(MappingJackson2MessageConverter.java:234)〜[spring]中字符#1,字节#7处的无效UTF-32字符0x51473863(大于0x0010ffff) -messaging-5.1.0.BUILD-SNAPSHOT.jar /:5.1.0.BUILD-快照]

avro spring-cloud-stream confluent confluent-schema-registry
1个回答
0
投票

每次有MessageConversionException时,它只是意味着提供的消息转换器无法转换消息。 。 。这是理所当然的,因为我们无法处理每一个场景,只能处理常见场景。因此,您应该实现自定义消息转换器(从头开始或扩展现有的一个)。以下是有关如何定义自定义Message Converter https://docs.spring.io/spring-cloud-stream/docs/Fishtown.M1/reference/htmlsingle/#spring-cloud-stream-overview-user-defined-message-converters的更多信息

© www.soinside.com 2019 - 2024. All rights reserved.