我想扩展这里面临的进一步问题:org.springframework.kafka.support.serializer.ErrorHandlingDeserializ' of type serializer.ErrorHandlingDeserializer] while 设置构造函数参数
现在,我面临以下错误 - 使用以下 xml
21:15:17.173 [main] WARN o.s.c.s.ClassPathXmlApplicationContext - Exception encountered during context initialization - cancelling refresh attempt: org.springframework.context.ApplicationContextException: Failed to start bean 'containerListener'
Exception in thread "main" org.springframework.context.ApplicationContextException: Failed to start bean 'containerListener'
at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:186)
at org.springframework.context.support.DefaultLifecycleProcessor$LifecycleGroup.start(DefaultLifecycleProcessor.java:363)
at java.base/java.lang.Iterable.forEach(Iterable.java:75)
at org.springframework.context.support.DefaultLifecycleProcessor.startBeans(DefaultLifecycleProcessor.java:160)
at org.springframework.context.support.DefaultLifecycleProcessor.onRefresh(DefaultLifecycleProcessor.java:128)
at org.springframework.context.support.AbstractApplicationContext.finishRefresh(AbstractApplicationContext.java:968)
at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:618)
at org.springframework.context.support.ClassPathXmlApplicationContext.<init>(ClassPathXmlApplicationContext.java:144)
at org.springframework.context.support.ClassPathXmlApplicationContext.<init>(ClassPathXmlApplicationContext.java:85)
at com.example.kafka.MyMainApp.main(MyMainApp.java:16)
Caused by: java.lang.ClassCastException: class java.lang.String cannot be cast to class java.lang.Class (java.lang.String and java.lang.Class are in module java.base of loader 'bootstrap')
at com.example.kafka.serdes.AppJsonDeserializer.configure(AppJsonDeserializer.java:35)
at org.springframework.kafka.support.serializer.ErrorHandlingDeserializer.configure(ErrorHandlingDeserializer.java:137)
at org.springframework.kafka.core.DefaultKafkaConsumerFactory.lambda$valueDeserializerSupplier$9(DefaultKafkaConsumerFactory.java:194)
at org.springframework.kafka.core.DefaultKafkaConsumerFactory$ExtendedKafkaConsumer.<init>(DefaultKafkaConsumerFactory.java:479)
at org.springframework.kafka.core.DefaultKafkaConsumerFactory.createRawConsumer(DefaultKafkaConsumerFactory.java:461)
at org.springframework.kafka.core.DefaultKafkaConsumerFactory.createKafkaConsumer(DefaultKafkaConsumerFactory.java:438)
at org.springframework.kafka.core.DefaultKafkaConsumerFactory.createConsumerWithAdjustedProperties(DefaultKafkaConsumerFactory.java:415)
at org.springframework.kafka.core.DefaultKafkaConsumerFactory.createKafkaConsumer(DefaultKafkaConsumerFactory.java:382)
at org.springframework.kafka.core.DefaultKafkaConsumerFactory.createConsumer(DefaultKafkaConsumerFactory.java:359)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.<init>(KafkaMessageListenerContainer.java:890)
at org.springframework.kafka.listener.KafkaMessageListenerContainer.doStart(KafkaMessageListenerContainer.java:393)
at org.springframework.kafka.listener.AbstractMessageListenerContainer.start(AbstractMessageListenerContainer.java:510)
at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:183)
... 9 more
XML
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"
xmlns:util="http://www.springframework.org/schema/util"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
http://www.springframework.org/schema/util
http://www.springframework.org/schema/util/spring-util.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context-3.0.xsd">
<bean id="employeeProducer" class="com.example.kafka.producer.EmployeeProducer" />
<util:map id="utilMap" map-class="java.util.HashMap">
<entry key="bootstrap.servers" value="localhost"/>
<entry key="auto.offset.reset" value="latest"/>
<entry key="group.id" value="group1" />
<entry key="client.id" value="my-client-id" />
<entry key="max.poll.records" value="1"/>
<entry key="value.class.name" value="com.example.kafka.model.Employee" />
</util:map>
<bean id="keyDeserializer" class="org.apache.kafka.common.serialization.StringDeserializer" />
<bean id="cf" class="org.springframework.kafka.core.DefaultKafkaConsumerFactory">
<constructor-arg index="0" ref="utilMap" />
<constructor-arg index="1" ref="keyDeserializer" />
<constructor-arg index="2" ref="valueDeserializer" />
<constructor-arg index="3" value="true" />
</bean>
<bean id="valueDeserializer" class="org.springframework.kafka.support.serializer.ErrorHandlingDeserializer">
<constructor-arg name="delegate">
<bean class="com.example.kafka.serdes.AppJsonDeserializer"/>
</constructor-arg>
</bean>
<bean id="cp" class="org.springframework.kafka.listener.ContainerProperties">
<constructor-arg name="topics" value="t-employee"/>
<property name="groupId" value="group1"/>
<property name="messageListener" ref="myListener"/>
</bean>
<bean id="containerListener" class="org.springframework.kafka.listener.KafkaMessageListenerContainer">
<constructor-arg index="0" ref="cf"/>
<constructor-arg index="1" ref="cp" />
</bean>
<bean id="myListener" class="com.example.kafka.listener.MyMessageListener" />
<bean id="employeeConsumer" class="com.example.kafka.consumer.EmployeeKafkaConsumer2" >
<constructor-arg name="topic" value="t-employee" />
</bean>
</beans>
最初我期望调用下面的实现,
但即使这样在设置第四个构造函数参数后也不起作用。
您的异常与您指向的代码无关:
Caused by: java.lang.ClassCastException: class java.lang.String cannot be cast to class java.lang.Class (java.lang.String and java.lang.Class are in module java.base of loader 'bootstrap')
at com.example.kafka.serdes.AppJsonDeserializer.configure(AppJsonDeserializer.java:35)
该例外与您的
DefaultKafkaConsumerFactory
定义没有任何联系。
现在您需要查看您的
AppJsonDeserializer
并将该字符串解析为 Class
,因为您的自定义代码需要它。