我有一个 Spring Batch 应用程序,它成功运行了 Spring Batch 作业,但在声明多个 DirectChannel 时出现异常。
当我启动“firstJob”时发生异常。这是例外:
`Caused by: org.springframework.messaging.MessageDeliveryException: Dispatcher has no subscribers for channel 'IFRSGoodBookService-1.secondReplies'.
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:76)
at org.springframework.integration.channel.AbstractMessageChannel.sendInternal(AbstractMessageChannel.java:375)
at org.springframework.integration.channel.AbstractMessageChannel.sendWithMetrics(AbstractMessageChannel.java:346)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:326)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:499)
at org.springframework.integration.handler.AbstractMessageProducingHandler.doProduceOutput(AbstractMessageProducingHandler.java:354)
at org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:283)
at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:247)
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:142)
at org.springframework.integration.handler.AbstractMessageHandler.doHandleMessage(AbstractMessageHandler.java:105)
at org.springframework.integration.handler.AbstractMessageHandler.handleWithMetrics(AbstractMessageHandler.java:90)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:70)
at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115)
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:133)
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106)
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72)
at org.springframework.integration.channel.AbstractMessageChannel.sendInternal(AbstractMessageChannel.java:375)
at org.springframework.integration.channel.AbstractMessageChannel.sendWithMetrics(AbstractMessageChannel.java:346)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:326)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:299)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
at org.springframework.integration.endpoint.MessageProducerSupport.lambda$sendMessage$1(MessageProducerSupport.java:262)
at io.micrometer.observation.Observation.lambda$observe$0(Observation.java:493)
at io.micrometer.observation.Observation.observeWithContext(Observation.java:603)
at io.micrometer.observation.Observation.observe(Observation.java:492)
at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:262)
at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter.access$200(AmqpInboundChannelAdapter.java:69)
at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter$Listener.createAndSend(AmqpInboundChannelAdapter.java:397)
at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter$Listener.onMessage(AmqpInboundChannelAdapter.java:360)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1663)
... 14 common frames omitted
Caused by: org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:139)
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106)
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72)
... 50 common frames omitted`
这是我的 FlowConfig 类,它声明了通道:
@Configuration
@RequiredArgsConstructor
class FlowConfig {
private final QueueConfig queueConfig;
@Bean
public DirectChannel firstRequests() {
return new DirectChannel();
}
@Bean
public DirectChannel firstReplies() {
return new DirectChannel();
}
@Bean
public DirectChannel secondRequests() {
return new DirectChannel();
}
@Bean
public DirectChannel secondReplies() {
return new DirectChannel();
}
@Bean("secondManagerInBoundFlow")
@Profile("manager")
public IntegrationFlow secondManagerInBoundFlow() {
return queueConfig.getInboundAdapter(true, secondReplies());
}
@Bean("secondWorkerInBoundFlow")
@Profile("worker")
public IntegrationFlow secondInBoundFlow() {
return queueConfig.getInboundAdapter(false, secondRequests());
}
@Bean("secondManagerOutboundFlow")
@Profile("manager")
public IntegrationFlow secondManagerOutboundFlow() {
return queueConfig.getOutboundAdapter(true, secondRequests());
}
@Bean("secondWorkerOutboundFlow")
@Profile("worker")
public IntegrationFlow secondWorkerOutboundFlow() {
return queueConfig.getOutboundAdapter(false, secondReplies());
}
@Bean("firstManagerInBoundFlow")
@Profile("manager")
public IntegrationFlow firstManagerInBoundFlow() {
return queueConfig.getInboundAdapter(true, firstReplies());
}
@Bean("firstWorkerInBoundFlow")
@Profile("worker")
public IntegrationFlow firstWorkerInBoundFlow() {
return queueConfig.getInboundAdapter(false, firstRequests());
}
@Bean("firstManagerOutboundFlow")
@Profile("manager")
public IntegrationFlow firstManagerOutboundFlow() {
return queueConfig.getOutboundAdapter(true, firstRequests());
}
@Bean("firstWorkerOutboundFlow")
@Profile("worker")
public IntegrationFlow firstWorkerOutboundFlow() {
return queueConfig.getOutboundAdapter(false, firstReplies());
}
}
这是入站和出站适配器的实现:
@Configuration
@ConditionalOnProperty("spring.rabbitmq.enabled")
@RequiredArgsConstructor
public class RabbitMqQueueConfig implements QueueConfig {
private final ConnectionFactory connectionFactory;
private final RabbitTemplate defaultRabbitTemplate;
private final QueueConstants queueConstants;
@Override
public IntegrationFlow getInboundAdapter(boolean isManager, DirectChannel channel) {
String queueName = isManager ? queueConstants.getConstantWithPrefix(QueueConstants.JOB_REPLIES_QUEUE)
: queueConstants.getConstantWithPrefix(QueueConstants.JOB_REQUESTS_QUEUE);
return IntegrationFlow.from(Amqp.inboundAdapter(connectionFactory, queueName)).channel(channel).get();
}
@Override
public IntegrationFlow getOutboundAdapter(boolean isManager, DirectChannel channel) {
String queueName = isManager ? queueConstants.getConstantWithPrefix(QueueConstants.JOB_REQUESTS_QUEUE)
: queueConstants.getConstantWithPrefix(QueueConstants.JOB_REPLIES_QUEUE);
AmqpOutboundChannelAdapterSpec messageHandlerSpec = Amqp.outboundAdapter(defaultRabbitTemplate).routingKey(queueName);
return IntegrationFlow.from(channel).handle(messageHandlerSpec).get();
}
}
这是JobManager配置
@Configuration
@Profile("manager")
@EnableBatchIntegration
@AllArgsConstructor
public class JobManagerPartitionConfiguration {
private final JobRepository jobRepository;
private final RemotePartitioningManagerStepBuilderFactory managerStepBuilderFactory;
private PlatformTransactionManager transactionManager;
private DeleteDataTasklet deleteDataTasklet;
private InstDataLoader instDataLoader;
private final ApplicationProperties appProperties;
private final DirectChannel firstRequests;
private final DirectChannel firstReplies;
private final DirectChannel secondRequests;
private final DirectChannel secondReplies;
private final ManagerJobListener managerJobListener;
private final IdBoundaryPartitioner idBoundaryPartitioner;
private final ContextService contextService;
@Bean
public Step firstJobManagerStep() {
return managerStepBuilderFactory.get("firstJobManagerStep")
.partitioner("remotefirstJobAsStep", idBoundaryPartitioner)
.gridSize(appProperties.getJobParameters().getGridSize())
.outputChannel(firstRequests)
.inputChannel(firstReplies)
.listener(new SyncStepContextWithJob())
.build();
}
@Bean
public Step secondJobManagerStep() {
return managerStepBuilderFactory.get("secondJobManagerStep")
.partitioner("remoteSecondJobAsStep", idBoundaryPartitioner)
.gridSize(appProperties.getJobParameters().getGridSize())
.outputChannel(secondRequests)
.inputChannel(secondReplies)
.listener(new SyncStepContextWithJob())
.build();
}
@Bean
public Job secondJob(Step secondJobManagerStep) {
return new JobBuilder("secondJob", jobRepository).incrementer(new RunIdIncrementer())
.start(instDataLoaderStep())
.next(deleteTable())
.next(secodJobManagerStep)
.listener(contextService)
.listener(managerJobListener)
.build();
}
@Bean
public Job firstJob(Step firstJobManagerStep) {
return new JobBuilder("firstJob", jobRepository).incrementer(new RunIdIncrementer())
.start(instDataLoaderStep())
.next(deleteTable())
.next(firstJobManagerStep)
.listener(contextService)
.listener(managerJobListener)
.build();
}
这是我的 JobWorker 配置:
@Configuration
@Profile("worker")
@EnableBatchIntegration
@AllArgsConstructor
@Slf4j
public class JobWorkerPartitionConfiguration {
private RemotePartitioningWorkerStepBuilderFactory workerStepBuilderFactory;
private JobRepository jobRepository;
private JobCache<I9Data[]> cache;
private CacheJobListener<I9Data[]> jobListener;
private WorkerJobListener workerJobListener;
private StepMonitoringListener monitoringListener;
private ApplicationProperties appProperties;
private InstDataLoader instDataLoader;
private ContractLoader contractLoader;
private CacheItemWriter cacheItemWriter;
private PlatformTransactionManager transactionManager;
private InstDataJpaCache instDataJpaCache;
private ContextService contextService;
private MonitorService monitorService;
@Bean
@StepScope
Job remoteFirstJob(NamedParameterJdbcTemplate jdbcTemplate) {
return new JobBuilder("remoteFirstJob", jobRepository).start(instDataLoaderStep())
.next(contractLoaderTaskletStep())
.next(calculateStep())
.listener(contextService)
.listener(jobListener)
.listener(workerJobListener)
.listener(monitoringListener)
// .listener(new SyncStepContextWithJob(this.monitorService))
.build();
}
@Bean
public Step remoteFirstJobAsStep(
DirectChannel firstRequests,
DirectChannel firstReplies
) {
return workerStepBuilderFactory.get("remoteFirstJobAsStep")
.inputChannel(firstRequests)
.outputChannel(firstReplies)
.parametersExtractor(remoteJobParametersExtractor())
.listener(new SyncStepContextWithJob())
.build();
}
问题就是为什么当我开始第一份工作时会遇到这样的异常。它不应该关心第二个回复,因为我在 JobManagerPartitionConfiguration 类中的示例为第一个作业定义了 inputChannel =“firstReplies”和 outputChannel =“firstRequests”,这意味着它应该使用这些通道而不是第二个通道配置。
您的堆栈跟踪清楚地表明该通道上的问题来自通过 AMQP 入站通道适配器使用的消息:
at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:262)
at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter.access$200(AmqpInboundChannelAdapter.java:69)
at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter$Listener.createAndSend(AmqpInboundChannelAdapter.java:397)
at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter$Listener.onMessage(AmqpInboundChannelAdapter.java:360)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1663)
是的,我看到你确实喜欢这样:
@Bean("secondManagerInBoundFlow")
@Profile("manager")
public IntegrationFlow secondManagerInBoundFlow() {
return queueConfig.getInboundAdapter(true, secondReplies());
}
但是显然仍然有一些活动的
manager
配置文件,这使得这个通道适配器被启动,但是在这个管理器配置文件中你可能没有这个通道的消费者。我们确实可以在您的代码中看到:
@Bean("secondWorkerOutboundFlow")
@Profile("worker")
public IntegrationFlow secondWorkerOutboundFlow() {
return queueConfig.getOutboundAdapter(false, secondReplies());
}
哪一个看起来像
worker
个人资料。
我无法回答你的问题,因为我还无法理解其中的逻辑。