如何在 Spring XML + Kafka App 中进行错误处理?我使用 JSON 来生成和消费消息,但是当消费者获取垃圾数据时,它会运行无限循环。
这是我用过的
spring.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context-3.0.xsd">
<bean id="employeeProducer" class="com.example.kafka.producer.EmployeeProducer" />
<bean id="myListener" class="com.example.kafka.listener.MyMessageListener" />
<bean id="employeeConsumer" class="com.example.kafka.consumer.EmployeeKafkaConsumer2" >
<constructor-arg name="topic" value="t-employee" />
</bean>
</beans>
好的。我无法运行您的应用程序,因为它是更大项目的一部分,因此我很难将其隔离。然而我看到了问题所在。你这样做:
DefaultKafkaConsumerFactory<String, Employee> kafkaConsumerFactory = new DefaultKafkaConsumerFactory<>(
propsMap,
new StringDeserializer(),
new AppJsonDeserializer<Employee>()
);
因此所有这些
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG
配置道具都将被忽略。
因为您的
AppJsonDeserializer
中没有任何错误处理:
try {
return objectMapper.readValue(data, className);
} catch (Exception e) {
LOGGER.error("## Exception : {}", e.getMessage());
throw new SerializationException(e);
}
在反序列化过程中,相同的“垃圾”记录会出现无限循环,这并不奇怪。
更新
我仍然无法正确运行你的应用程序:太多的docker容器,这些容器与主题无关。
我认为解决方案一定是这样的:
private void initializeConsumer(String topic) {
Map<String, Object> propsMap = new HashMap<>();
propsMap.put(ConsumerConfig.CLIENT_ID_CONFIG, AppConfig.CLIENT_ID);
propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, AppConfig.BOOTSTRAP_SERVER);
propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, "group1");
propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
propsMap.put(AppJsonDeserializer.VALUE_CLASS_NAME_CONFIG, Employee.class);
DefaultKafkaConsumerFactory<String, Employee> kafkaConsumerFactory = new DefaultKafkaConsumerFactory<>(
propsMap,
new StringDeserializer(),
new ErrorHandlingDeserializer<>(new AppJsonDeserializer<>())
);
ContainerProperties containerProperties = new ContainerProperties(topic);
containerProperties.setMessageListener(new MyMessageListener());
ConcurrentMessageListenerContainer<String, Employee> container =
new ConcurrentMessageListenerContainer<>(kafkaConsumerFactory, containerProperties);
container.start();
}
正如我所说:当您向
DefaultKafkaConsumerFactory
ctor 提供实例时,解串器的那些道具将被忽略。
因此,您的 AppJsonDeserializer
必须包裹在 ErrorHandlingDeserializer
中。
从bean构造函数中执行
container.start()
仍然是错误的。另外,最好有 DefaultKafkaConsumerFactory
和 ConcurrentMessageListenerContainer
作为豆子。这样它们的配置和生命周期将由 Spring 正确管理。