Spring Kafka Streams:类型信息丢失

问题描述 投票:0回答:1

我已经使用Kafka Streams设置了一个简单的Spring Boot应用程序。 Spring Boot使用AutoConfiguration for Kafka创建工厂。消息是没有消息密钥的Json消息。使用以下配置会在headers消息中生成No Type信息。有什么投入?

spring.kafka.streams.properties.default.key.serde=org.apache.kafka.common.serialization.Serdes$StringSerde
spring.kafka.streams.properties.default.value.serde=org.springframework.kafka.support.serializer.JsonSerde
spring.kafka.consumer.properties.spring.json.key.default.type=java.lang.String
spring.kafka.consumer.properties.spring.json.value.default.type=com.abc.xyz.Person

错误

java.lang.IllegalStateException: No type information in headers and no default type provided
at org.springframework.util.Assert.state(Assert.java:73) ~[spring-core-5.1.5.RELEASE.jar:5.1.5.RELEASE]
at org.springframework.kafka.support.serializer.JsonDeserializer.deserialize(JsonDeserializer.java:353) ~[spring-kafka-2.2.4.RELEASE.jar:2.2.4.RELEASE]
at org.apache.kafka.streams.processor.internals.SourceNode.deserializeValue(SourceNode.java:63) ~[kafka-streams-2.0.1.jar:?]
at org.apache.kafka.streams.processor.internals.RecordDeserializer.deserialize(RecordDeserializer.java:66) [kafka-streams-2.0.1.jar:?]
at org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(RecordQueue.java:97) [kafka-streams-2.0.1.jar:?]
at org.apache.kafka.streams.processor.internals.PartitionGroup.addRawRecords(PartitionGroup.java:117) [kafka-streams-2.0.1.jar:?]
at org.apache.kafka.streams.processor.internals.StreamTask.addRecords(StreamTask.java:677) [kafka-streams-2.0.1.jar:?]
at org.apache.kafka.streams.processor.internals.StreamThread.addRecordsToTasks(StreamThread.java:943) [kafka-streams-2.0.1.jar:?]
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:831) [kafka-streams-2.0.1.jar:?]
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:767) [kafka-streams-2.0.1.jar:?]
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:736) [kafka-streams-2.0.1.jar:?]
spring-kafka
1个回答
0
投票

Streams不使用常规的consumer属性。

有些文档让您相信在使用流时需要设置该属性吗?如果是这样,请打开GitHub问题,以便我们修复它。

它的

spring.kafka.streams.properties.spring.json.value.default.type=com.abc.xyz.Person

使用流时。

你不需要key.default.type,因为你没有JSON密钥serde。

© www.soinside.com 2019 - 2024. All rights reserved.