Spring Kafka 无法序列化对象

问题描述 投票:0回答:1

我正在将一个对象放入 Kafka 队列中。 在 Kafka 上放置对象时出现此错误

10:40:51,823错误[org.springframework.scheduling.support.TaskUtils$LoggingErrorHandler](scheduling-1)计划任务中发生意外错误:org.apache.kafka.common.errors.SerializationException:无法转换值类 com.tlx.afxintegrations.commons.models.request.TransactionalJournalRequestDto 到 value.serializer 中指定的类 org.apache.kafka.common.serialization.StringSerializer 在deployment.afx-integrations.war //org.apache.kafka.clients. Producer.KafkaProducer.doSend(KafkaProducer.java:957) 在deployment.afx-integrations.war //org.apache.kafka.clients. Producer.KafkaProducer.send(KafkaProducer.java:914) 在deployment.afx-integrations.war //org.springframework.kafka.core.DefaultKafkaProducerFactory$CloseSafeProducer.send(DefaultKafkaProducerFactory.java:993) 在deployment.afx-integrations.war //org.springframework.kafka.core.KafkaTemplate.doSend(KafkaTemplate.java:655) 在deployment.afx-integrations.war //org.springframework.kafka.core.KafkaTemplate.send(KafkaTemplate.java:403) 在deployment.afx-integrations.war//com.tlx.afxintegrations.inbound.accountingentries.listeners.FCYTransactionsListener.listenForFCYTransactions(FCYTransactionsListener.java:149) 在 jdk.internal.reflect.GenerateMethodAccessor90.invoke(来源未知) 在java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 在 java.base/java.lang.reflect.Method.invoke(Method.java:568) 在deployment.afx-integrations.war //org.springframework.scheduling.support.ScheduledMethodRunnable.run(ScheduledMethodRunnable.java:84) 在deployment.afx-integrations.war //org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54) 在 java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539) 在 java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305) 在 java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305) 在 java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) 在 java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) 在 java.base/java.lang.Thread.run(Thread.java:840) 导致:java.lang.ClassCastException:类com.tlx.afxintegrations.commons.models.request.TransactionalJournalRequestDto无法转换为类java.lang.String(com.tlx.afxintegrations.commons.models.request.TransactionalJournalRequestDto处于未命名状态加载器'deployment.afx-integrations.war'的模块@155f826;java.lang.String位于加载器'bootstrap'的模块java.base中) 在deployment.afx-integrations.war//org.apache.kafka.common.serialization.StringSerializer.serialize(StringSerializer.java:29) 在deployment.afx-integrations.war//org.apache.kafka.common.serialization.Serializer.serialize(Serializer.java:62) 在deployment.afx-integrations.war //org.apache.kafka.clients. Producer.KafkaProducer.doSend(KafkaProducer.java:954) ...还有16个

我们正在将一个对象放入 Kafka 队列中。

期待这个连载。

serialization apache-kafka spring-kafka
1个回答
0
投票

看起来您使用了默认的 Spring Boot 自动配置,默认情况下为

org.apache.kafka.common.serialization.StringSerializer
属性提供
value.serializer

    /**
     * Serializer class for values.
     */
    private Class<?> valueSerializer = StringSerializer.class;

由于您没有从应用程序发送

String
,因此您的
com.tlx.afxintegrations.commons.models.request.TransactionalJournalRequestDto
无法序列化为 Kafka 记录也就不足为奇了。

考虑将

spring.kafka.producer.value-serializer
Spring Boot 属性配置为适合您的
TransactionalJournalRequestDto
序列化逻辑的任何内容。

https://docs.spring.io/spring-boot/docs/current/reference/htmlsingle/#messaging.kafka.additional-properties

https://docs.spring.io/spring-kafka/reference/kafka/serdes.html

© www.soinside.com 2019 - 2024. All rights reserved.