卡夫卡消费者错误

问题描述 投票:2回答:2

我正在使用kafka制作人和Spring kafka消费者。我正在使用Json序列化器和反序列化器。每当我尝试从主题中读取消费者中的消息时,我会收到以下错误:

org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition fan_topic-0 at offset 154. If needed, please seek past the record to continue consumption.
Caused by: java.lang.IllegalStateException: No type information in headers and no default type provided

我还没有在生产者和消费者中配置任何关于标题的内容。我在这里失踪了什么?

apache-kafka kafka-consumer-api kafka-producer-api spring-kafka
2个回答
4
投票

我相信你错过了JsonDeserializer必须在ConsumerFactory上使用适当的默认类型进行反序列化,而不是在Kafka属性中。

所有信息都显示在文档:https://docs.spring.io/spring-kafka/docs/2.1.7.RELEASE/reference/html/_reference.html#serdes


0
投票

只是添加到上面的答案,

以下变化为我解决了。

config.put(JsonSerializer.ADD_TYPE_INFO_HEADERS, false);

加入

return new DefaultKafkaConsumerFactory<>(config, new StringDeserializer(), new JsonDeserializer<>(String.class));

代替

return new DefaultKafkaConsumerFactory<String, String>(config);

以供参考,

deserialize下面的方法期待标题和“Assert.state..”抛出IllegalStateException

 @Override
        public T deserialize(String topic, Headers headers, byte[] data) {
            JavaType javaType = this.typeMapper.toJavaType(headers);
            if (javaType == null) {
                Assert.state(this.targetType != null, "No type information in headers and no default type provided");
                return deserialize(topic, data);
            }
            else {
                try {
                    return this.objectMapper.readerFor(javaType).readValue(data);
                }
                catch (IOException e) {
                    throw new SerializationException("Can't deserialize data [" + Arrays.toString(data) +
                            "] from topic [" + topic + "]", e);
                }
            }
        }
© www.soinside.com 2019 - 2024. All rights reserved.