在我的项目中,我们使用 Spring XML 配置(目前没有范围使用 Spring Boot),我希望通过良好的异常处理和重试机制来实现 Kafka 消费者。
此外,想探索开箱即用的异常处理和自定义异常处理,同样希望探索重试。
问题是我看不到 XML 片段和工作代码。
案例-1:我希望实现开箱即用的异常处理,下面是代码,但我收到垃圾数据的错误无限循环异常。
Caused by: org.apache.kafka.common.errors.RecordDeserializationException: Error deserializing key/value for partition t-employee-0 at offset 4. If needed, please seek past the record to continue consumption.
at org.apache.kafka.clients.consumer.internals.CompletedFetch.parseRecord(CompletedFetch.java:309)
at org.apache.kafka.clients.consumer.internals.CompletedFetch.fetchRecords(CompletedFetch.java:263)
at org.apache.kafka.clients.consumer.internals.AbstractFetch.fetchRecords(AbstractFetch.java:340)
at org.apache.kafka.clients.consumer.internals.AbstractFetch.collectFetch(AbstractFetch.java:306)
at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1235)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1186)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1159)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollConsumer(KafkaMessageListenerContainer.java:1666)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doPoll(KafkaMessageListenerContainer.java:1641)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1439)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1330)
... 2 common frames omitted
Caused by: org.apache.kafka.common.errors.SerializationException: Can't deserialize data from topic [t-employee]
at org.springframework.kafka.support.serializer.JsonDeserializer.deserialize(JsonDeserializer.java:588)
at org.apache.kafka.common.serialization.Deserializer.deserialize(Deserializer.java:73)
at org.apache.kafka.clients.consumer.internals.CompletedFetch.parseRecord(CompletedFetch.java:300)
... 12 common frames omitted
context.xml 文件
<bean id="employeeProducer" class="com.example.kafka.producer.EmployeeProducer" />
<bean id="defaultKafkaConsumerFactory" class="org.springframework.kafka.core.DefaultKafkaConsumerFactory">
<constructor-arg>
<map>
<entry key="spring.json.trusted.packages" value="*" />
<entry key="bootstrap.servers" value="localhost:9092"/>
<entry key="auto.offset.reset" value="latest"/>
<entry key="group.id" value="group1" />
<entry key="client.id" value="my-client-id" />
<entry key="max.poll.records" value="1"/>
<entry key="key.deserializer" value="org.springframework.kafka.support.serializer.ErrorHandlingDeserializer" />
<entry key="value.deserializer" value="org.springframework.kafka.support.serializer.ErrorHandlingDeserializer" />
<entry key="spring.deserializer.key.delegate.class" value="org.springframework.kafka.support.serializer.JsonDeserializer" />
<entry key="spring.deserializer.value.delegate.class" value="org.springframework.kafka.support.serializer.JsonDeserializer" />
<entry key="value.class.name" value="com.example.kafka.model.Employee" />
<entry key="spring.json.value.default.type" value="com.example.kafka.model.Employee" />
<entry key="spring.kafka.producer.properties.spring.json.add.type.headers" value="false" />
</map>
</constructor-arg>
<constructor-arg>
<bean id="keyDeserializer" class="org.apache.kafka.common.serialization.StringDeserializer" />
</constructor-arg>
<constructor-arg>
<bean id="valueDeserializer" class="org.springframework.kafka.support.serializer.JsonDeserializer" />
</constructor-arg>
</bean>
<bean id="containerProperties" class="org.springframework.kafka.listener.ContainerProperties">
<constructor-arg name="topics" value="t-employee"/>
<property name="groupId" value="group1"/>
<property name="messageListener" ref="myListener" />
</bean>
<bean id="containerListener" class="org.springframework.kafka.listener.KafkaMessageListenerContainer">
<constructor-arg index="0" ref="defaultKafkaConsumerFactory"/>
<constructor-arg index="1" ref="containerProperties" />
</bean>
<bean id="myListener" class="com.example.kafka.listener.MyMessageListener" />
我不清楚 JsonDeserializer 是否也必须在 ConsumerFactory 和 Properties 上配置(对此不太清楚),或者我看不到在 XML 中执行此操作的方法,有人可以指导吗?
主要担心的是垃圾数据最终陷入无限循环。
不幸的是,我不完全确定您目前的情况,但我会尽力帮助您解决我认为相当清楚的问题。
从问题开始
我不清楚 JsonDeserializer 是否也必须在 ConsumerFactory 和 Properties 上配置 [...]
不必在两个地方都完成,这完全取决于您要使用哪个构造函数。请参阅 DefaultKafkaConsumerFactory 类文档 - 如果您不提供其他参数并将其保留在映射中,它将仅使用另一个构造函数。
之后事情开始变得棘手:我知道您想要调试为什么您的应用程序没有反序列化任何内容。根据您的情况,我会尝试考虑:
org.apache.kafka.common.serialization.StringDeserializer
) 甚至 Bytes 反序列化器来找到一些答案。您遇到的这个异常似乎表明您的应用程序无法从中生成 JSON,所以如果您可以尝试更通用的类型,它可能会为您指明正确的方向(也许它不是 JSON,而是 YML/XML?)