Spring XMl App中consumer端如何处理异常?

问题描述 投票:0回答:1

如何在 Spring XML + Kafka App 中进行错误处理?我使用 JSON 来生成和消费消息,但是当消费者获取垃圾数据时,它会运行无限循环。

已在此处上传代码:https://github.com/javaHelper/spring-boot-advance-demos/tree/main/spring-xml-kafka-json-pro-con

这是我用过的

public class EmployeeKafkaConsumer2 {

    public EmployeeKafkaConsumer2(String topic) {
        initializeConsumer(topic);
    }

    private void initializeConsumer(String topic) {
        Map<String, Object> propsMap = new HashMap<>();
        propsMap.put(ConsumerConfig.CLIENT_ID_CONFIG, AppConfig.CLIENT_ID);
        propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, AppConfig.BOOTSTRAP_SERVER);
        propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        // This will not handle junk data
        //propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, AppJsonDeserializer.class);

        propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
        propsMap.put("spring.kafka.consumer.properties.spring.deserializer.value.delegate.class", org.springframework.kafka.support.serializer.JsonDeserializer.class);

        propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, "group1");
        propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
        propsMap.put(AppJsonDeserializer.VALUE_CLASS_NAME_CONFIG, Employee.class);
        propsMap.put(org.springframework.kafka.support.serializer.JsonDeserializer.TRUSTED_PACKAGES, "*");
        propsMap.put(org.springframework.kafka.support.serializer.JsonDeserializer.VALUE_DEFAULT_TYPE, "com.example.kafka.model.Employee");

        DefaultKafkaConsumerFactory<String, Employee> kafkaConsumerFactory = new DefaultKafkaConsumerFactory<>(
                propsMap,
                new StringDeserializer(),
                new AppJsonDeserializer<Employee>()
        );

        ContainerProperties containerProperties = new ContainerProperties(topic);
        containerProperties.setMessageListener(new MyMessageListener());

        ConcurrentMessageListenerContainer<String, Employee> container = new ConcurrentMessageListenerContainer<>(
            kafkaConsumerFactory,
            containerProperties
        );
        container.start();
     }
}

spring.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
    http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
    http://www.springframework.org/schema/context
    http://www.springframework.org/schema/context/spring-context-3.0.xsd">

    <bean id="employeeProducer" class="com.example.kafka.producer.EmployeeProducer" />

    <bean id="myListener" class="com.example.kafka.listener.MyMessageListener" />

    <bean id="employeeConsumer" class="com.example.kafka.consumer.EmployeeKafkaConsumer2" >
        <constructor-arg name="topic" value="t-employee" />
    </bean>
</beans>
spring apache-kafka spring-kafka
1个回答
0
投票

好的。我无法运行您的应用程序,因为它是更大项目的一部分,因此我很难将其隔离。然而我看到了问题所在。你这样做:

    DefaultKafkaConsumerFactory<String, Employee> kafkaConsumerFactory = new DefaultKafkaConsumerFactory<>(
            propsMap,
            new StringDeserializer(),
            new AppJsonDeserializer<Employee>()
    );

因此所有这些

ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG
配置道具都将被忽略。

因为您的

AppJsonDeserializer
中没有任何错误处理:

    try {
        return objectMapper.readValue(data, className);
    } catch (Exception e) {
        LOGGER.error("## Exception : {}", e.getMessage());
        throw new SerializationException(e);
    }

在反序列化过程中,相同的“垃圾”记录会出现无限循环,这并不奇怪。

© www.soinside.com 2019 - 2024. All rights reserved.