原因:java.lang.IllegalStateException:标头中没有类型信息且未提供默认类型

问题描述 投票:0回答:1

在我的项目中,我们使用 Spring XML 配置(目前没有范围使用 Spring Boot),我希望通过良好的异常处理和重试机制来实现 Kafka 消费者。

此外,想探索开箱即用的异常处理和自定义异常处理,同样希望探索重试。

问题是我看不到 XML 片段和工作代码。

案例-1:我希望实现开箱即用的异常处理,下面是代码,但我收到垃圾数据的错误无限循环异常。

Caused by: org.apache.kafka.common.errors.RecordDeserializationException: Error deserializing key/value for partition t-employee-0 at offset 4. If needed, please seek past the record to continue consumption.
    at org.apache.kafka.clients.consumer.internals.CompletedFetch.parseRecord(CompletedFetch.java:309)
    at org.apache.kafka.clients.consumer.internals.CompletedFetch.fetchRecords(CompletedFetch.java:263)
    at org.apache.kafka.clients.consumer.internals.AbstractFetch.fetchRecords(AbstractFetch.java:340)
    at org.apache.kafka.clients.consumer.internals.AbstractFetch.collectFetch(AbstractFetch.java:306)
    at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1235)
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1186)
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1159)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollConsumer(KafkaMessageListenerContainer.java:1666)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doPoll(KafkaMessageListenerContainer.java:1641)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1439)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1330)
    ... 2 common frames omitted
Caused by: org.apache.kafka.common.errors.SerializationException: Can't deserialize data  from topic [t-employee]
    at org.springframework.kafka.support.serializer.JsonDeserializer.deserialize(JsonDeserializer.java:588)
    at org.apache.kafka.common.serialization.Deserializer.deserialize(Deserializer.java:73)
    at org.apache.kafka.clients.consumer.internals.CompletedFetch.parseRecord(CompletedFetch.java:300)
    ... 12 common frames omitted

context.xml 文件

<bean id="employeeProducer" class="com.example.kafka.producer.EmployeeProducer" />

<bean id="defaultKafkaConsumerFactory" class="org.springframework.kafka.core.DefaultKafkaConsumerFactory">
    <constructor-arg>
        <map>
            <entry key="spring.json.trusted.packages" value="*" />
            <entry key="bootstrap.servers" value="localhost:9092"/>
            <entry key="auto.offset.reset" value="latest"/>
            <entry key="group.id" value="group1" />
            <entry key="client.id" value="my-client-id" />
            <entry key="max.poll.records" value="1"/>
            <entry key="key.deserializer" value="org.springframework.kafka.support.serializer.ErrorHandlingDeserializer" />
            <entry key="value.deserializer" value="org.springframework.kafka.support.serializer.ErrorHandlingDeserializer" />
            <entry key="spring.deserializer.key.delegate.class" value="org.springframework.kafka.support.serializer.JsonDeserializer" />
            <entry key="spring.deserializer.value.delegate.class" value="org.springframework.kafka.support.serializer.JsonDeserializer" />
            <entry key="value.class.name" value="com.example.kafka.model.Employee" />
            <entry key="spring.json.value.default.type" value="com.example.kafka.model.Employee" />

            <entry key="spring.kafka.producer.properties.spring.json.add.type.headers" value="false" />
        </map>
    </constructor-arg>
    <constructor-arg>
        <bean id="keyDeserializer" class="org.apache.kafka.common.serialization.StringDeserializer" />
    </constructor-arg>
    <constructor-arg>
        <bean id="valueDeserializer" class="org.springframework.kafka.support.serializer.JsonDeserializer" />
    </constructor-arg>
</bean>

<bean id="containerProperties" class="org.springframework.kafka.listener.ContainerProperties">
    <constructor-arg name="topics" value="t-employee"/>
    <property name="groupId" value="group1"/>
    <property name="messageListener" ref="myListener"  />
</bean>

<bean id="containerListener" class="org.springframework.kafka.listener.KafkaMessageListenerContainer">
    <constructor-arg index="0" ref="defaultKafkaConsumerFactory"/>
    <constructor-arg index="1" ref="containerProperties" />
</bean>

<bean id="myListener" class="com.example.kafka.listener.MyMessageListener" />

我不清楚 JsonDeserializer 是否也必须在 ConsumerFactory 和 Properties 上配置(对此不太清楚),或者我看不到在 XML 中执行此操作的方法,有人可以指导吗?

主要担心的是垃圾数据最终陷入无限循环。

java spring apache-kafka spring-kafka
1个回答
0
投票

不幸的是,我不完全确定您目前的情况,但我会尽力帮助您解决我认为相当清楚的问题。

从问题开始

我不清楚 JsonDeserializer 是否也必须在 ConsumerFactory 和 Properties 上配置 [...]

不必在两个地方都完成,这完全取决于您要使用哪个构造函数。请参阅 DefaultKafkaConsumerFactory 类文档 - 如果您不提供其他参数并将其保留在映射中,它将仅使用另一个构造函数。

之后事情开始变得棘手:我知道您想要调试为什么您的应用程序没有反序列化任何内容。根据您的情况,我会尝试考虑:

  1. 实现您自己的 CommonErrorHandler(请参阅docs),然后您将在上下文中将其注册为 bean(通过将其添加到 XML 中)。它是尝试处理某些特定类型的异常的标准方法。此外,如果您的上下文正在使用外部应用程序(例如,您仅添加到项目中并且无法查看其来源的 JAR),它仍然应该允许您自由地将其添加到上下文(因此添加到 Kafka Consumer) 。这里也有一个很好的例子来说明如何做到这一点。
  2. 但是,如果您无法选择添加此类处理程序,您仍然可以尝试使用 StringDeserializer (
  3. org.apache.kafka.common.serialization.StringDeserializer
    ) 甚至 Bytes 反序列化器来找到一些答案。您遇到的这个异常似乎表明您的应用程序无法从中生成 JSON,所以如果您可以尝试更通用的类型,它可能会为您指明正确的方向(也许它不是 JSON,而是 YML/XML?)
  4. 最后但并非最不重要的一点:有人写了这个主题,也许你可以寻求帮助?也许这看起来很愚蠢,但如果该主题使用了一些更复杂的数据类型(例如具有自己模式的
  5. Apache Avro,或者可能是一个压缩的主题,其中旧数据被逻辑删除),那么它要么从以下位置收集信息: 生产者或自己弄清楚它的漫长过程(如果正确查看字节,这是可以完成的,但让我们面对现实:我们不想这样做,对吧?)
这样或那样:祝你好运,希望你能尽快找到正确的格式!

© www.soinside.com 2019 - 2024. All rights reserved.