在ChunkProcessorChunkHandler上找不到Spring Batch SpelEvaluationException方法

问题描述 投票:0回答:1

我有一个通过RabbitMQ执行远程分块的Spring Batch作业基本上,有两个应用程序master和worker,每个都是一个spring-boot应用程序。遵循主配置:

@Bean
public DirectChannel requestsToWorkers() {
    return new DirectChannel();
}

@Bean
public IntegrationFlow outboundFlow() {
    return IntegrationFlows
            .from(requestsToWorkers())
            .handle(Amqp.outboundAdapter(configurableRabbitTemplate).routingKey("master-route"))
            .get();
}

@Bean
public QueueChannel repliesFromWorkers() {
    return new QueueChannel();
}

@Bean
public IntegrationFlow inboundFlow() {
    return IntegrationFlows
            .from(Amqp.inboundAdapter(configurableRabbitMqConnectionFactory, messageBrokerProperties.getMessageQueue().getQueueName()))
            .channel(repliesFromWorkers())
            .get();
}

@Bean
public ItemReader<Integer> testIntegerItemReader() {
    return () -> new Random().nextInt();
}

@Bean
public ItemWriter<Integer> testAqmpItemWriter() {
    final MessagingTemplate messagingTemplate = new MessagingTemplate();
    messagingTemplate.setDefaultChannel(requestsToWorkers());
    messagingTemplate.setReceiveTimeout(3000);
    final ChunkMessageChannelItemWriter<Integer> chunkMessageChannelItemWriter = new ChunkMessageChannelItemWriter<>();
    chunkMessageChannelItemWriter.setMessagingOperations(messagingTemplate);
    chunkMessageChannelItemWriter.setReplyChannel(repliesFromWorkers());
    return chunkMessageChannelItemWriter;
}

@Bean
public Job testJob(final ItemReader<Integer> testIntegerItemReader,
                   final ItemWriter<Integer> testAqmpItemWriter) {
    return jobBuilderFactory.get("testJob")
            .incrementer(new RunIdIncrementer())
            .start(
                    stepBuilderFactory.get("masterStep")
                            .<Integer, Integer>chunk(1)
                            .reader(testIntegerItemReader)
                            .writer(testAqmpItemWriter)
                            .build()
            )
            .build();
}

和适当的工人配置:

@Bean
public DirectChannel requestsChannel() {
    return new DirectChannel();
}

@Bean
public DirectChannel repliesChannel() {
    return new DirectChannel();
}

@Bean
public IntegrationFlow inboundFlow() {
    return IntegrationFlows
            .from(Amqp.inboundAdapter(configurableRabbitMqConnectionFactory, messageBrokerProperties.getMessageQueue().getQueueName()))
            .channel(requestsChannel())
            .get();
}

@Bean
public IntegrationFlow outboundFlow() {
    return IntegrationFlows
            .from(repliesChannel())
            .handle(Amqp.outboundAdapter(configurableRabbitTemplate).routingKey("worker-route"))
            .get();
}

@Bean
public ItemProcessor<Integer, Integer> processor() {
    return account -> {
        System.out.println("Processed random int " + account);
        return account;
    };
}

@Bean
public ItemWriter<Integer> writer() {
    return response -> System.out.println("Value written to AMQP " + response);
}

@Bean
@ServiceActivator(inputChannel = "requestsChannel", outputChannel = "repliesChannel")
public ChunkProcessorChunkHandler<Integer> chunkProcessorChunkHandler(final ItemProcessor<Integer, Integer> processor,
                                                                      final ItemWriter<Integer> writer) {
    final SimpleChunkProcessor<Integer, Integer> chunkProcessor = new SimpleChunkProcessor<>(processor, writer);
    final ChunkProcessorChunkHandler<Integer> chunkProcessorChunkHandler = new ChunkProcessorChunkHandler<>();
    chunkProcessorChunkHandler.setChunkProcessor(chunkProcessor);
    return chunkProcessorChunkHandler;
}

每个模块都使用SpringBoot 2.1.2。总体而言,一切看起来都很好,数据块通过RabbitMQ成功地从主服务器发送到工作服务器,但是当工作者试图实际读取消息时,我收到以下异常

org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException: Listener threw exception
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.wrapToListenerExecutionFailedExceptionIfNeeded(AbstractMessageListenerContainer.java:1613) ~[spring-rabbit-2.1.3.RELEASE.jar:2.1.3.RELEASE]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1517) ~[spring-rabbit-2.1.3.RELEASE.jar:2.1.3.RELEASE]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerContainer.java:1440) ~[spring-rabbit-2.1.3.RELEASE.jar:2.1.3.RELEASE]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:1428) ~[spring-rabbit-2.1.3.RELEASE.jar:2.1.3.RELEASE]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:1423) ~[spring-rabbit-2.1.3.RELEASE.jar:2.1.3.RELEASE]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1372) ~[spring-rabbit-2.1.3.RELEASE.jar:2.1.3.RELEASE]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:870) [spring-rabbit-2.1.3.RELEASE.jar:2.1.3.RELEASE]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:854) [spring-rabbit-2.1.3.RELEASE.jar:2.1.3.RELEASE]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$1600(SimpleMessageListenerContainer.java:78) [spring-rabbit-2.1.3.RELEASE.jar:2.1.3.RELEASE]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.mainLoop(SimpleMessageListenerContainer.java:1137) [spring-rabbit-2.1.3.RELEASE.jar:2.1.3.RELEASE]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1043) [spring-rabbit-2.1.3.RELEASE.jar:2.1.3.RELEASE]
at java.lang.Thread.run(Thread.java:748) [na:1.8.0_181]
Caused by: org.springframework.messaging.MessageHandlingException: nested exception is org.springframework.expression.spel.SpelEvaluationException: EL1004E: Method call: Method handleChunk(byte[]) cannot be found on type org.springframework.batch.integration.chunk.ChunkProcessorChunkHandler
at org.springframework.integration.handler.MethodInvokingMessageProcessor.processMessage(MethodInvokingMessageProcessor.java:109) ~[spring-integration-core-5.1.2.RELEASE.jar:5.1.2.RELEASE]
at org.springframework.integration.handler.ServiceActivatingHandler.handleRequestMessage(ServiceActivatingHandler.java:93) ~[spring-integration-core-5.1.2.RELEASE.jar:5.1.2.RELEASE]
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:123) ~[spring-integration-core-5.1.2.RELEASE.jar:5.1.2.RELEASE]
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:162) ~[spring-integration-core-5.1.2.RELEASE.jar:5.1.2.RELEASE]
at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115) ~[spring-integration-core-5.1.2.RELEASE.jar:5.1.2.RELEASE]
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:132) ~[spring-integration-core-5.1.2.RELEASE.jar:5.1.2.RELEASE]
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:105) ~[spring-integration-core-5.1.2.RELEASE.jar:5.1.2.RELEASE]
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:73) ~[spring-integration-core-5.1.2.RELEASE.jar:5.1.2.RELEASE]
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:453) ~[spring-integration-core-5.1.2.RELEASE.jar:5.1.2.RELEASE]
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:401) ~[spring-integration-core-5.1.2.RELEASE.jar:5.1.2.RELEASE]
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187) ~[spring-messaging-5.1.4.RELEASE.jar:5.1.4.RELEASE]
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166) ~[spring-messaging-5.1.4.RELEASE.jar:5.1.4.RELEASE]
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47) ~[spring-messaging-5.1.4.RELEASE.jar:5.1.4.RELEASE]
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109) ~[spring-messaging-5.1.4.RELEASE.jar:5.1.4.RELEASE]
at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:431) ~[spring-integration-core-5.1.2.RELEASE.jar:5.1.2.RELEASE]
at org.springframework.integration.handler.AbstractMessageProducingHandler.doProduceOutput(AbstractMessageProducingHandler.java:284) ~[spring-integration-core-5.1.2.RELEASE.jar:5.1.2.RELEASE]
at org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:265) ~[spring-integration-core-5.1.2.RELEASE.jar:5.1.2.RELEASE]
at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:223) ~[spring-integration-core-5.1.2.RELEASE.jar:5.1.2.RELEASE]
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:129) ~[spring-integration-core-5.1.2.RELEASE.jar:5.1.2.RELEASE]
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:162) ~[spring-integration-core-5.1.2.RELEASE.jar:5.1.2.RELEASE]
at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115) ~[spring-integration-core-5.1.2.RELEASE.jar:5.1.2.RELEASE]
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:132) ~[spring-integration-core-5.1.2.RELEASE.jar:5.1.2.RELEASE]
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:105) ~[spring-integration-core-5.1.2.RELEASE.jar:5.1.2.RELEASE]
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:73) ~[spring-integration-core-5.1.2.RELEASE.jar:5.1.2.RELEASE]
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:453) ~[spring-integration-core-5.1.2.RELEASE.jar:5.1.2.RELEASE]
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:401) ~[spring-integration-core-5.1.2.RELEASE.jar:5.1.2.RELEASE]
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187) ~[spring-messaging-5.1.4.RELEASE.jar:5.1.4.RELEASE]
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166) ~[spring-messaging-5.1.4.RELEASE.jar:5.1.4.RELEASE]
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47) ~[spring-messaging-5.1.4.RELEASE.jar:5.1.4.RELEASE]
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109) ~[spring-messaging-5.1.4.RELEASE.jar:5.1.4.RELEASE]
at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:205) ~[spring-integration-core-5.1.2.RELEASE.jar:5.1.2.RELEASE]
at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter.access$600(AmqpInboundChannelAdapter.java:57) ~[spring-integration-amqp-5.1.2.RELEASE.jar:5.1.2.RELEASE]
at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter$Listener.createAndSend(AmqpInboundChannelAdapter.java:237) ~[spring-integration-amqp-5.1.2.RELEASE.jar:5.1.2.RELEASE]
at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter$Listener.onMessage(AmqpInboundChannelAdapter.java:204) ~[spring-integration-amqp-5.1.2.RELEASE.jar:5.1.2.RELEASE]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1514) ~[spring-rabbit-2.1.3.RELEASE.jar:2.1.3.RELEASE]
... 10 common frames omitted

已经花了很多时间在这个问题上真的很感激帮助

spring-boot spring-integration spring-batch amqp
1个回答
0
投票

handleChunk期望一个ChunkRequest,而不是byte[]所以看起来在入站通道适配器中反序列化出了问题。它应该有一个SimpleMessageConverter(默认值)处理java.io.SerializableChunkRequest实现)。

如果没有内容类型的消息属性(如果转换器只是在不了解内容类型时返回byte[]),就会发生这种情况。

出站适配器使用相同的转换器,因此应该设置好。

查看Rabbit Management UI上的消息以确保标头正确无误。它应该有这个值application/x-java-serialized-object

您还可以通过启用调试日志记录来检查消息和标头。

热门问题
推荐问题
最新问题