消费者ion Spring如何处理异常?

问题描述 投票:0回答:1

如何在 Spring XML + Kafka App 中进行错误处理?我使用 JSON 来生成和消费消息,但是当消费者获取垃圾数据时,它会运行无限循环。

这是我用过的

spring.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
    http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
    http://www.springframework.org/schema/context
    http://www.springframework.org/schema/context/spring-context-3.0.xsd">

    <bean id="employeeProducer" class="com.example.kafka.producer.EmployeeProducer" />

    <bean id="myListener" class="com.example.kafka.listener.MyMessageListener" />

    <bean id="employeeConsumer" class="com.example.kafka.consumer.EmployeeKafkaConsumer2" >
        <constructor-arg name="topic" value="t-employee" />
    </bean>
</beans>
spring apache-kafka spring-kafka
1个回答
2
投票

好的。我无法运行您的应用程序,因为它是更大项目的一部分,因此我很难将其隔离。然而我看到了问题所在。你这样做:

    DefaultKafkaConsumerFactory<String, Employee> kafkaConsumerFactory = new DefaultKafkaConsumerFactory<>(
            propsMap,
            new StringDeserializer(),
            new AppJsonDeserializer<Employee>()
    );

因此所有这些

ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG
配置道具都将被忽略。

因为您的

AppJsonDeserializer
中没有任何错误处理:

    try {
        return objectMapper.readValue(data, className);
    } catch (Exception e) {
        LOGGER.error("## Exception : {}", e.getMessage());
        throw new SerializationException(e);
    }

在反序列化过程中,相同的“垃圾”记录会出现无限循环,这并不奇怪。

更新

我仍然无法正确运行你的应用程序:太多的docker容器,这些容器与主题无关。

我认为解决方案一定是这样的:

private void initializeConsumer(String topic) {
    Map<String, Object> propsMap = new HashMap<>();
    propsMap.put(ConsumerConfig.CLIENT_ID_CONFIG, AppConfig.CLIENT_ID);
    propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, AppConfig.BOOTSTRAP_SERVER);

    propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, "group1");
    propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
    propsMap.put(AppJsonDeserializer.VALUE_CLASS_NAME_CONFIG, Employee.class);

    DefaultKafkaConsumerFactory<String, Employee> kafkaConsumerFactory = new DefaultKafkaConsumerFactory<>(
            propsMap,
            new StringDeserializer(),
            new ErrorHandlingDeserializer<>(new AppJsonDeserializer<>())
    );

    ContainerProperties containerProperties = new ContainerProperties(topic);
    containerProperties.setMessageListener(new MyMessageListener());

    ConcurrentMessageListenerContainer<String, Employee> container =
            new ConcurrentMessageListenerContainer<>(kafkaConsumerFactory, containerProperties);

    container.start();
 }

正如我所说:当您向

DefaultKafkaConsumerFactory
ctor 提供实例时,解串器的那些道具将被忽略。 因此,您的
AppJsonDeserializer
必须包裹在
ErrorHandlingDeserializer
中。

从bean构造函数中执行

container.start()
仍然是错误的。另外,最好有
DefaultKafkaConsumerFactory
ConcurrentMessageListenerContainer
作为豆子。这样它们的配置和生命周期将由 Spring 正确管理。

© www.soinside.com 2019 - 2024. All rights reserved.