Spring云流批处理生产者抛出`SerializationException`

问题描述 投票:0回答:1

我正在尝试使用 doc 中描述的批量生成器。

我创建了以下内容

Supplier

@Bean
public Supplier<List<Message<String>>> fetch() {
    return () -> List.of(
                MessageBuilder.withPayload("message-1").build(),
        MessageBuilder.withPayload("message-2").build(),
        MessageBuilder.withPayload("message-3").build(),
        MessageBuilder.withPayload("message-4").build());
}

根据我的理解,每条消息都应该独立发送。但我有以下例外:

2024-02-24T08:43:58.973+01:00 ERROR 2143477 --- [   scheduling-1] o.s.integration.handler.LoggingHandler   : org.springframework.messaging.MessageHandlingException: error occurred in message handler [org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder$ProducerConfigurationMessageHandler@3da51e74], failedMessage=GenericMessage [payload=[GenericMessage [payload=byte[7], headers={contentType=application/json, id=6d584b7d-b699-7fb6-3e8c-9c9363c68b30, timestamp=1708760638972}], GenericMessage [payload=byte[7], headers={contentType=application/json, id=c2515099-eb7c-ee10-a9ad-6a5370ad052c, timestamp=1708760638972}], GenericMessage [payload=byte[7], headers={contentType=application/json, id=836d0275-aceb-09da-1f6d-ee85e724725b, timestamp=1708760638972}], GenericMessage [payload=byte[7], headers={contentType=application/json, id=cbc18cbf-d3b1-47f9-a557-fa3fb7cc6379, timestamp=1708760638972}]], headers={id=50b82042-6d93-a0b2-62b5-d2793520d76a, timestamp=1708760638972}]
        at org.springframework.integration.support.utils.IntegrationUtils.wrapInHandlingExceptionIfNecessary(IntegrationUtils.java:191)
        at org.springframework.integration.handler.AbstractMessageHandler.doHandleMessage(AbstractMessageHandler.java:108)
        at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:73)
        at org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder$ProducerConfigurationMessageHandler.handleMessage(KafkaMessageChannelBinder.java:1567)
        at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder$SendingHandler.handleMessageInternal(AbstractMessageChannelBinder.java:1196)
        at org.springframework.integration.handler.AbstractMessageHandler.doHandleMessage(AbstractMessageHandler.java:105)
        at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:73)
        at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:132)
        at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:133)
        at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106)
        at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72)
        at org.springframework.integration.channel.AbstractMessageChannel.sendInternal(AbstractMessageChannel.java:378)
        at org.springframework.integration.channel.AbstractMessageChannel.sendWithMetrics(AbstractMessageChannel.java:349)
        at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:329)
        at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)
        at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)
        at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
        at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
        at org.springframework.integration.router.AbstractMessageRouter.doSend(AbstractMessageRouter.java:228)
        at org.springframework.integration.router.AbstractMessageRouter.handleMessageInternal(AbstractMessageRouter.java:210)
        at org.springframework.integration.handler.AbstractMessageHandler.doHandleMessage(AbstractMessageHandler.java:105)
        at org.springframework.integration.handler.AbstractMessageHandler.handleWithMetrics(AbstractMessageHandler.java:90)
        at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:70)
        at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:132)
        at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:133)
        at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106)
        at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72)
        at org.springframework.integration.channel.AbstractMessageChannel.sendInternal(AbstractMessageChannel.java:378)
        at org.springframework.integration.channel.AbstractMessageChannel.sendWithMetrics(AbstractMessageChannel.java:349)
        at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:329)
        at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:302)
        at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)
        at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)
        at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
        at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
        at org.springframework.integration.endpoint.SourcePollingChannelAdapter.handleMessage(SourcePollingChannelAdapter.java:206)
        at org.springframework.integration.endpoint.AbstractPollingEndpoint.messageReceived(AbstractPollingEndpoint.java:481)
        at org.springframework.integration.endpoint.AbstractPollingEndpoint.doPoll(AbstractPollingEndpoint.java:467)
        at org.springframework.integration.endpoint.AbstractPollingEndpoint.pollForMessage(AbstractPollingEndpoint.java:419)
        at org.springframework.integration.endpoint.AbstractPollingEndpoint.lambda$createPoller$4(AbstractPollingEndpoint.java:355)
        at org.springframework.integration.util.ErrorHandlingTaskExecutor.lambda$execute$0(ErrorHandlingTaskExecutor.java:57)
        at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:50)
        at org.springframework.integration.util.ErrorHandlingTaskExecutor.execute(ErrorHandlingTaskExecutor.java:55)
        at org.springframework.integration.endpoint.AbstractPollingEndpoint.lambda$createPoller$5(AbstractPollingEndpoint.java:348)
        at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54)
        at org.springframework.scheduling.concurrent.ReschedulingRunnable.run(ReschedulingRunnable.java:96)
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:572)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:317)
        at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
        at java.base/java.lang.Thread.run(Thread.java:1583)
Caused by: org.apache.kafka.common.errors.SerializationException: Can't convert value of class java.util.ArrayList to class org.apache.kafka.common.serialization.ByteArraySerializer specified in value.serializer
        at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:1003)
        at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:947)
        at org.springframework.kafka.core.DefaultKafkaProducerFactory$CloseSafeProducer.send(DefaultKafkaProducerFactory.java:1050)
        at org.springframework.kafka.core.KafkaTemplate.doSend(KafkaTemplate.java:799)
        at org.springframework.kafka.core.KafkaTemplate.observeSend(KafkaTemplate.java:768)
        at org.springframework.kafka.core.KafkaTemplate.send(KafkaTemplate.java:577)
        at org.springframework.integration.kafka.outbound.KafkaProducerMessageHandler.handleRequestMessage(KafkaProducerMessageHandler.java:532)
        at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:145)
        at org.springframework.integration.handler.AbstractMessageHandler.doHandleMessage(AbstractMessageHandler.java:105)
        ... 50 more
Caused by: java.lang.ClassCastException: class java.util.ArrayList cannot be cast to class [B (java.util.ArrayList and [B are in module java.base of loader 'bootstrap')
        at org.apache.kafka.common.serialization.ByteArraySerializer.serialize(ByteArraySerializer.java:19)
        at org.apache.kafka.common.serialization.Serializer.serialize(Serializer.java:62)
        at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:1000)
        ... 58 more

看起来与序列化有关,但我可以发送

  • List<String>
    :一条唯一的消息,字符串列表为json
  • Message<String>
    :带有单个字符串的独特消息
  • Message<List<String>
    :与第一个相同

所以我怀疑序列化本身有问题。肯定和批量生产有关。或者也许是他们的组合?项目使用Spring Boot 3.2.2和Spring Cloud Stream 4.1.0。

我目前正在使用一种解决方法,通过使用

StreamBridge
单独发送消息。效果很好。但我想了解为什么函数式方法不起作用。难道我做错了什么 ?或者这可能是 Spring Cloud Stream 中的一个错误?

java spring-boot spring-cloud-stream spring-cloud-stream-binder-kafka
1个回答
0
投票

您遇到的异常(SerializationException)确实与消息发送到 Kafka 之前的序列化方式有关。该错误消息表明 Kafka 需要字节数组 ([B),但收到的是 ArrayList。这可能是由于批量生产者处理消息的方式所致。

当您使用批量生产者时,Spring Cloud Stream 尝试将整批消息序列化为单个字节数组。但是,您的批处理中的消息是 Message 类型,Spring Cloud Stream 不知道如何直接序列化。

要解决此问题,您需要为批量消息提供自定义序列化程序。您可以通过使用自定义 MessageConverter 配置 ProducerFactory bean 来实现此目的。以下是如何执行此操作的示例:

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.support.converter.MessagingMessageConverter;
import org.springframework.messaging.Message;

@Configuration
public class KafkaConfig {

    @Bean
    public ProducerFactory<?, ?> kafkaProducerFactory(org.apache.kafka.common.serialization.Serializer<?> keySerializer,
                                                       org.apache.kafka.common.serialization.Serializer<Message<?>> valueSerializer) {
        return new DefaultKafkaProducerFactory<>(producerConfigs(), keySerializer, valueSerializer);
    }

    @Bean
    public org.apache.kafka.common.serialization.Serializer<?> keySerializer() {
        return new StringSerializer();
    }

    @Bean
    public org.apache.kafka.common.serialization.Serializer<Message<?>> valueSerializer() {
        return new MessagingMessageConverter(); // Use a custom converter here if needed
    }

    // Other Kafka configurations
}

在此配置中:

* We define a ProducerFactory bean where we specify custom serializers for both keys and values.
* The value serializer is set to MessagingMessageConverter, which is a Spring Cloud Stream message converter that can handle Message objects.
* You can replace MessagingMessageConverter with a custom converter if needed.

完成此配置后,Spring Cloud Stream 应该能够在将消息发送到 Kafka 之前正确序列化一批消息。

© www.soinside.com 2019 - 2024. All rights reserved.