反序列化后如何处理SerializationException

问题描述 投票:6回答:1

我在Spring Kafka设置中使用Avro和Schema注册表。

我想以某种方式处理SerializationException,它可能在反序列化期间抛出。

我找到了以下两个资源:

https://github.com/spring-projects/spring-kafka/issues/164

How do I configure spring-kafka to ignore messages in the wrong format?

这些资源表明我返回null而不是在反序列化时抛出SerializationException并监听KafkaNull。这个解决方案很好用。

但是,我希望能够抛出异常而不是返回null。

KIP-161KIP-210为处理异常提供了更好的功能。我确实在Spring Cloud中找到了一些提到KIP-161的资源,但没有具体说明Spring-Kafka。

有谁知道如何在Spring Boot中捕捉SerializationException

我使用的是Spring Boot 2.0.2

编辑:我找到了解决方案。

我宁愿抛出异常并抓住它而不是返回null或KafkaNull。我在多个不同的项目中使用我的自定义Avro序列化器和反序列化器,其中一些不是Spring。如果我更改了我的Avro序列化程序和反序列化程序,则需要更改其他一些项目以期望反序列化程序返回null。

我想关闭容器,这样我就不会丢失任何消息。在生产中永远不应该期望SerializationException。只有在Schema Registry关闭或者以某种方式将未格式化的消息发送到生产kafka时,才能发生SerializationException。无论哪种方式,SerializationException应该只发生很少,如果它发生,那么我想关闭容器,以便没有消息丢失,我可以调查问题。

只需考虑将从您的消费者容器中捕获所有异常。在我的具体情况下,我只想关闭,如果它是一个SerializationException

public class SerializationExceptionHandler extends ContainerStoppingErrorHandler {

    @Override
    public void handle(Exception thrownException, List<ConsumerRecord<?, ?>> records, Consumer<?, ?> consumer,
                       MessageListenerContainer container) {

        //Only call super if the exception is SerializationException
        if (thrownException instanceof SerializationException) {
            //This will shutdown the container.
            super.handle(thrownException, records, consumer, container);
        } else {
            //Wrap and re-throw the exception
            throw new KafkaException("Kafka Consumer Container Error", thrownException);
        }
    }
}

此处理程序将传递给使用者容器。下面是一个KafkaListenerContainerFactory豆的例子。

@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>>
kafkaListenerContainerFactory(JpaTransactionManager jpa, KafkaTransactionManager<?, ?> kafka) {

    ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
            new ConcurrentKafkaListenerContainerFactory<>();

    factory.setConsumerFactory(consumerFactory());
    factory.setConcurrency(1);
    factory.getContainerProperties().setPollTimeout(3000);

    factory.getContainerProperties().setErrorHandler(new SerializationExceptionHandler());

    factory.getContainerProperties().setTransactionManager(chainedTxM(jpa, kafka));
    return factory;
}
serialization error-handling deserialization spring-kafka
1个回答
6
投票

春天无能为力;反序列化发生在消费者获取任何数据之前。您需要增强解串器。

但是,我希望能够抛出异常而不是返回null。

这对任何事都没有帮助,因为Kafka不知道如何处理异常。再次;这一切都在数据可用之前发生,因此返回null(或其他一些特殊值)是最好的技术。

编辑

在2.2中,我们添加了一个error handling deserializer,它委托给实际的反序列化器并返回null,但在头文件中有例外;然后,侦听器容器将其直接传递给错误处理程序而不是侦听器。

© www.soinside.com 2019 - 2024. All rights reserved.