org.springframework.kafka.support.serializer.ErrorHandlingDeserializ'类型为serializer.ErrorHandlingDeserializer],同时设置构造函数参数

问题描述 投票:0回答:1

通过参考:如何处理 Spring XMl App 中消费者的异常?,我正在尝试将 Java bean 依赖关系转换为 xml bean 并面临以下错误。

有人可以解释一下出了什么问题吗?

错误:

Exception in thread "main" org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'cf' defined in class path resource [context.xml]: Cannot resolve reference to bean 'valueDeserializer' while setting constructor argument
    at org.springframework.beans.factory.support.BeanDefinitionValueResolver.resolveReference(BeanDefinitionValueResolver.java:377)
    at org.springframework.beans.factory.support.BeanDefinitionValueResolver.resolveValueIfNecessary(BeanDefinitionValueResolver.java:135)
    at org.springframework.beans.factory.support.ConstructorResolver.resolveConstructorArguments(ConstructorResolver.java:681)
    at org.springframework.beans.factory.support.ConstructorResolver.autowireConstructor(ConstructorResolver.java:202)
    at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.autowireConstructor(AbstractAutowireCapableBeanFactory.java:1350)
    at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBeanInstance(AbstractAutowireCapableBeanFactory.java:1187)
    at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:558)
    at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBean(AbstractAutowireCapableBeanFactory.java:518)
    at org.springframework.beans.factory.support.AbstractBeanFactory.lambda$doGetBean$0(AbstractBeanFactory.java:326)
    at org.springframework.beans.factory.support.DefaultSingletonBeanRegistry.getSingleton(DefaultSingletonBeanRegistry.java:234)
    at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:324)
    at org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:200)
    at org.springframework.beans.factory.support.DefaultListableBeanFactory.preInstantiateSingletons(DefaultListableBeanFactory.java:973)
    at org.springframework.context.support.AbstractApplicationContext.finishBeanFactoryInitialization(AbstractApplicationContext.java:952)
    at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:615)
    at org.springframework.context.support.ClassPathXmlApplicationContext.<init>(ClassPathXmlApplicationContext.java:144)
    at org.springframework.context.support.ClassPathXmlApplicationContext.<init>(ClassPathXmlApplicationContext.java:85)
    at com.example.kafka.MyMainApp.main(MyMainApp.java:15)
Caused by: org.springframework.beans.factory.UnsatisfiedDependencyException: Error creating bean with name 'valueDeserializer' defined in class path resource [context.xml]: Unsatisfied dependency expressed through constructor parameter 0: Could not convert argument value of type [java.lang.String] to required type [org.apache.kafka.common.serialization.Deserializer]: Failed to convert value of type 'java.lang.String' to required type 'org.apache.kafka.common.serialization.Deserializer'; Cannot convert value of type 'java.lang.String' to required type 'org.apache.kafka.common.serialization.Deserializer': no matching editors or conversion strategy found
    at org.springframework.beans.factory.support.ConstructorResolver.createArgumentArray(ConstructorResolver.java:756)
    at org.springframework.beans.factory.support.ConstructorResolver.autowireConstructor(ConstructorResolver.java:236)
    at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.autowireConstructor(AbstractAutowireCapableBeanFactory.java:1350)
    at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBeanInstance(AbstractAutowireCapableBeanFactory.java:1187)
    at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:558)
    at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBean(AbstractAutowireCapableBeanFactory.java:518)
    at org.springframework.beans.factory.support.AbstractBeanFactory.lambda$doGetBean$0(AbstractBeanFactory.java:326)
    at org.springframework.beans.factory.support.DefaultSingletonBeanRegistry.getSingleton(DefaultSingletonBeanRegistry.java:234)
    at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:324)
    at org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:200)
    at org.springframework.beans.factory.support.BeanDefinitionValueResolver.resolveReference(BeanDefinitionValueResolver.java:365)
    ... 17 more

代码

public class EmployeeKafkaConsumer {

    public EmployeeKafkaConsumer2(String topic) {
        initializeConsumer(topic);
    }

    private void initializeConsumer(String topic) {
        Map<String, Object> propsMap = new HashMap<>();
        propsMap.put(ConsumerConfig.CLIENT_ID_CONFIG, AppConfig.CLIENT_ID);
        propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, AppConfig.BOOTSTRAP_SERVER);

        propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, "group1");
        propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
        propsMap.put(AppJsonDeserializer.VALUE_CLASS_NAME_CONFIG, Employee.class);

        DefaultKafkaConsumerFactory<String, Employee> kafkaConsumerFactory = new DefaultKafkaConsumerFactory<>(
                propsMap,
                new StringDeserializer(),
                new ErrorHandlingDeserializer<>(new AppJsonDeserializer<>())
        );

        ContainerProperties containerProperties = new ContainerProperties(topic);
        containerProperties.setMessageListener(new MyMessageListener());

        ConcurrentMessageListenerContainer<String, Employee> container =
                new ConcurrentMessageListenerContainer<>(kafkaConsumerFactory, containerProperties);
        container.start();
    }
}

xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
    http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
    http://www.springframework.org/schema/context
    http://www.springframework.org/schema/context/spring-context-3.0.xsd">

    <bean id="employeeProducer" class="com.example.kafka.producer.EmployeeProducer" />

    <bean id="cf" class="org.springframework.kafka.core.DefaultKafkaConsumerFactory">
        <constructor-arg index="0">
            <map>
                <entry key="bootstrap.servers" value="localhost"/>
                <entry key="auto.offset.reset" value="latest"/>
                <entry key="group.id" value="group1" />
                <entry key="client.id" value="my-client-id" />
                <entry key="max.poll.records" value="1"/>
                <entry key="value.class.name" value="com.example.kafka.model.Employee" />
            </map>
        </constructor-arg>
        <constructor-arg index="1" name="keyDeserializer" value="org.apache.kafka.common.serialization.StringDeserializer" />
        <constructor-arg index="2" ref="valueDeserializer">

        </constructor-arg>

    </bean>

    <bean id="valueDeserializer" class="org.springframework.kafka.support.serializer.ErrorHandlingDeserializer">
        <constructor-arg name="delegate" value="com.example.kafka.serdes.AppJsonDeserializer" />
    </bean>

    <bean id="cp" class="org.springframework.kafka.listener.ContainerProperties">
        <constructor-arg name="topics" value="t-employee"/>
        <property name="groupId" value="group1"/>
        <property name="messageListener" ref="myListener"/>
    </bean>

    <bean id="container" class="org.springframework.kafka.listener.KafkaMessageListenerContainer">
        <constructor-arg ref="cf"/>
        <constructor-arg ref="cp" />
    </bean>


    <bean id="myListener" class="com.example.kafka.listener.MyMessageListener" />

    <bean id="employeeConsumer" class="com.example.kafka.consumer.EmployeeKafkaConsumer2" >
        <constructor-arg name="topic" value="t-employee" />
    </bean>
</beans>
spring apache-kafka spring-kafka
1个回答
0
投票

你的bean定义是错误的:

<bean id="valueDeserializer" class="org.springframework.kafka.support.serializer.ErrorHandlingDeserializer">
    <constructor-arg name="delegate" value="com.example.kafka.serdes.AppJsonDeserializer" />
</bean>

那个

delegate
不接受
String
,而是接受
Deserializer
实例。所以,它必须这样声明:

<bean id="valueDeserializer" class="org.springframework.kafka.support.serializer.ErrorHandlingDeserializer">
    <constructor-arg name="delegate">
       <bean class="com.example.kafka.serdes.AppJsonDeserializer"/>
    </constructor-arg>
</bean>

考虑迁移到 Java 和注释配置,以避免 XML 样式的额外负担。

© www.soinside.com 2019 - 2024. All rights reserved.