Spring Kafka 错误:此错误处理程序无法直接处理“SerializationException”;请考虑配置“ErrorHandlingDeserializer”

问题描述 投票:0回答:2

生产者属性

spring.kafka.producer.bootstrap-servers=127.0.0.1:9092
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer

消费属性

spring.kafka.consumer.bootstrap-servers=127.0.0.1:9092
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.consumer.properties.spring.json.trusted.packages=*
spring.kafka.consumer.group-id=user-group
server.port=8085

消费者服务

@Service
public class UserConsumerService {

    @KafkaListener(topics = { "user-topic" })
    public void consumerUserData(User user) {
        System.out.println("Users Age Is: " + user.getAge() + " Fav Genre " + user.getFavGenre());
    }
}

生产者服务

@Service
public class UserProducerService {

    @Autowired
    private KafkaTemplate<String, User> kafkaTemplate;

    public void sendUserData(User user) {
        kafkaTemplate.send("user-topic", user.getName(), user);
    }
}

用于创建主题的生产者配置

    @Configuration public class KafkaConfig {
    
        @Bean
        public NewTopic topicOrder() {
            return TopicBuilder.name("user-topic").partitions(2).replicas(1).build();
        } 
}

生产者工作良好,但消费者给出如下错误

2021-12-06 21:45:50.299 ERROR 4936 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : Consumer exception

java.lang.IllegalStateException: This error handler cannot process 'SerializationException's directly; please consider configuring an 
'ErrorHandlingDeserializer' in the value and/or key deserializer
    at
org.springframework.kafka.listener.DefaultErrorHandler.handleOtherException(DefaultErrorHandler.java:149)
~[spring-kafka-2.8.0.jar:2.8.0] DefaultErrorHandler.java:149
    at
org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.handleConsumerException(KafkaMessageListenerContainer.java:1760)
~[spring-kafka-2.8.0.jar:2.8.0]
KafkaMessageListenerContainer.java:1760
    at
org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1283)
~[spring-kafka-2.8.0.jar:2.8.0]
KafkaMessageListenerContainer.java:1283
    at
java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
~[na:na] Executors.java:539
    at
java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
~[na:na] FutureTask.java:264
    at
java.base/java.lang.Thread.run(Thread.java:833) ~[na:na]
Thread.java:833 Caused by:
org.apache.kafka.common.errors.RecordDeserializationException: Error
deserializing key/value for partition user-topic-0 at offset 1. If
needed, please seek past the record to continue consumption.
    at
org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:1429)
~[kafka-clients-3.0.0.jar:na] Fetcher.java:1429
    at
org.apache.kafka.clients.consumer.internals.Fetcher.access$3400(Fetcher.java:134)
~[kafka-clients-3.0.0.jar:na] Fetcher.java:134
    at
org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.fetchRecords(Fetcher.java:1652)
~[kafka-clients-3.0.0.jar:na] Fetcher.java:1652
    at
org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.access$1800(Fetcher.java:1488)
~[kafka-clients-3.0.0.jar:na] Fetcher.java:1488
    at
org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:721)
~[kafka-clients-3.0.0.jar:na] Fetcher.java:721
    at
org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:672)
~[kafka-clients-3.0.0.jar:na] Fetcher.java:672
    at
org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1277)
~[kafka-clients-3.0.0.jar:na] KafkaConsumer.java:1277
    at
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1238)
~[kafka-clients-3.0.0.jar:na] KafkaConsumer.java:1238
    at
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1211)
~[kafka-clients-3.0.0.jar:na] KafkaConsumer.java:1211
    at
org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollConsumer(KafkaMessageListenerContainer.java:1507)
~[spring-kafka-2.8.0.jar:2.8.0]
KafkaMessageListenerContainer.java:1507
    at
org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doPoll(KafkaMessageListenerContainer.java:1497)
~[spring-kafka-2.8.0.jar:2.8.0]
KafkaMessageListenerContainer.java:1497
    at
org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1325)
~[spring-kafka-2.8.0.jar:2.8.0] KafkaMessage

我是 Kafka 新手,试图找出为什么会出现此错误。我该如何解决这个问题?

java spring-boot apache-kafka spring-kafka
2个回答
19
投票

错误消息没有告诉您任何信息吗?

此错误处理程序无法直接处理“SerializationException”;请考虑在值和/或键反序列化器中配置“ErrorHandlingDeserializer”

请参阅文档:https://docs.spring.io/spring-kafka/docs/current/reference/html/#error-handling-deserializer

当反序列化器无法反序列化消息时,Spring 无法处理该问题,因为它发生在 poll() 返回之前。为了解决这个问题,引入了ErrorHandlingDeserializer。该解串器委托给真正的解串器(键或值)。如果委托无法反序列化记录内容,ErrorHandlingDeserializer 将返回 null 值,并在包含原因和原始字节的标头中返回 DeserializationException。当您使用记录级 MessageListener 时,如果 ConsumerRecord 包含键或值的 DeserializationException 标头,则会使用失败的 ConsumerRecord 调用容器的 ErrorHandler。记录不会传递给监听者。

您可以使用 DefaultKafkaConsumerFactory 构造函数,该构造函数采用键和值 Deserializer 对象,并连接到您已使用正确委托配置的适当 ErrorHandlingDeserializer 实例中。或者,您可以使用使用者配置属性(由 ErrorHandlingDeserializer 使用)来实例化委托。属性名称为 ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS 和 ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS。属性值可以是类或类名。以下示例展示了如何设置这些属性:

.. // other props
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
props.put(ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS, JsonDeserializer.class);
props.put(JsonDeserializer.KEY_DEFAULT_TYPE, "com.example.MyKey")
props.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class.getName());
props.put(JsonDeserializer.VALUE_DEFAULT_TYPE, "com.example.MyValue")
props.put(JsonDeserializer.TRUSTED_PACKAGES, "com.example")
return new DefaultKafkaConsumerFactory<>(props);

带引导:

...
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
spring.kafka.consumer.properties.spring.deserializer.value.delegate.class=org.springframework.kafka.support.serializer.JsonDeserializer
...

0
投票

就我而言,我的消费者正在消费以下通用产品

 @KafkaListener(
      topics = "${spring.kafka.topics.foo}",
      containerFactory = "foo")
  public <T extends DomainEvent> void listen(List<T> domainEvents, Acknowledgment acknowledgment) { 
   ...
}

并抱怨我添加的这个新子类型

ChildFoo2
,它扩展了我的DomainEvent。但是,我忘记将我的新子类作为 JsonSubType 添加到我的父类中。

因此,执行以下操作,才是我的案例的真正解决方法

@JsonSubTypes({@Type(ChildFoo1.class), @Type(ChildFoo2.class)})
public class DomainEvent {
...
}
© www.soinside.com 2019 - 2024. All rights reserved.