启动第一个作业会导致 - 声明多个 DirectChannel 时调度程序没有频道异常订阅者

问题描述 投票:0回答:1

我有一个 Spring Batch 应用程序,它成功运行了 Spring Batch 作业,但在声明多个 DirectChannel 时出现异常。

当我启动“firstJob”时发生异常。这是例外:

`Caused by: org.springframework.messaging.MessageDeliveryException: Dispatcher has no subscribers for channel 'IFRSGoodBookService-1.secondReplies'.
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:76)
    at org.springframework.integration.channel.AbstractMessageChannel.sendInternal(AbstractMessageChannel.java:375)
    at org.springframework.integration.channel.AbstractMessageChannel.sendWithMetrics(AbstractMessageChannel.java:346)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:326)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
    at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:499)
    at org.springframework.integration.handler.AbstractMessageProducingHandler.doProduceOutput(AbstractMessageProducingHandler.java:354)
    at org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:283)
    at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:247)
    at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:142)
    at org.springframework.integration.handler.AbstractMessageHandler.doHandleMessage(AbstractMessageHandler.java:105)
    at org.springframework.integration.handler.AbstractMessageHandler.handleWithMetrics(AbstractMessageHandler.java:90)
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:70)
    at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:133)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106)
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72)
    at org.springframework.integration.channel.AbstractMessageChannel.sendInternal(AbstractMessageChannel.java:375)
    at org.springframework.integration.channel.AbstractMessageChannel.sendWithMetrics(AbstractMessageChannel.java:346)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:326)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:299)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
    at org.springframework.integration.endpoint.MessageProducerSupport.lambda$sendMessage$1(MessageProducerSupport.java:262)
    at io.micrometer.observation.Observation.lambda$observe$0(Observation.java:493)
    at io.micrometer.observation.Observation.observeWithContext(Observation.java:603)
    at io.micrometer.observation.Observation.observe(Observation.java:492)
    at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:262)
    at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter.access$200(AmqpInboundChannelAdapter.java:69)
    at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter$Listener.createAndSend(AmqpInboundChannelAdapter.java:397)
    at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter$Listener.onMessage(AmqpInboundChannelAdapter.java:360)
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1663)
    ... 14 common frames omitted
Caused by: org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:139)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106)
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72)
    ... 50 common frames omitted`

这是我的 FlowConfig 类,它声明了通道:

@Configuration
@RequiredArgsConstructor
class FlowConfig {

    private final QueueConfig queueConfig;

    @Bean
    public DirectChannel firstRequests() {
        return new DirectChannel();
    }

    @Bean
    public DirectChannel firstReplies() {
        return new DirectChannel();
    }

    @Bean
    public DirectChannel secondRequests() {
        return new DirectChannel();
    }

    @Bean
    public DirectChannel secondReplies() {
        return new DirectChannel();
    }

    @Bean("secondManagerInBoundFlow")
    @Profile("manager")
    public IntegrationFlow secondManagerInBoundFlow() {
        return queueConfig.getInboundAdapter(true, secondReplies());
    }

    @Bean("secondWorkerInBoundFlow")
    @Profile("worker")
    public IntegrationFlow secondInBoundFlow() {
        return queueConfig.getInboundAdapter(false, secondRequests());
    }

    @Bean("secondManagerOutboundFlow")
    @Profile("manager")
    public IntegrationFlow secondManagerOutboundFlow() {
        return queueConfig.getOutboundAdapter(true, secondRequests());
    }

    @Bean("secondWorkerOutboundFlow")
    @Profile("worker")
    public IntegrationFlow secondWorkerOutboundFlow() {
        return queueConfig.getOutboundAdapter(false, secondReplies());
    }
    
    @Bean("firstManagerInBoundFlow")
    @Profile("manager")
    public IntegrationFlow firstManagerInBoundFlow() {
        return queueConfig.getInboundAdapter(true, firstReplies());
    }

    @Bean("firstWorkerInBoundFlow")
    @Profile("worker")
    public IntegrationFlow firstWorkerInBoundFlow() {
        return queueConfig.getInboundAdapter(false, firstRequests());
    }

    @Bean("firstManagerOutboundFlow")
    @Profile("manager")
    public IntegrationFlow firstManagerOutboundFlow() {
        return queueConfig.getOutboundAdapter(true, firstRequests());
    }

    @Bean("firstWorkerOutboundFlow")
    @Profile("worker")
    public IntegrationFlow firstWorkerOutboundFlow() {
        return queueConfig.getOutboundAdapter(false, firstReplies());
    }
}

这是入站和出站适配器的实现:

@Configuration
@ConditionalOnProperty("spring.rabbitmq.enabled")
@RequiredArgsConstructor
public class RabbitMqQueueConfig implements QueueConfig {

    private final ConnectionFactory connectionFactory;
    private final RabbitTemplate defaultRabbitTemplate;
    private final QueueConstants queueConstants;

    @Override
    public IntegrationFlow getInboundAdapter(boolean isManager, DirectChannel channel) {
        String queueName = isManager ? queueConstants.getConstantWithPrefix(QueueConstants.JOB_REPLIES_QUEUE)
                : queueConstants.getConstantWithPrefix(QueueConstants.JOB_REQUESTS_QUEUE);
        return IntegrationFlow.from(Amqp.inboundAdapter(connectionFactory, queueName)).channel(channel).get();
    }

    @Override
    public IntegrationFlow getOutboundAdapter(boolean isManager, DirectChannel channel) {
        String queueName = isManager ? queueConstants.getConstantWithPrefix(QueueConstants.JOB_REQUESTS_QUEUE)
                : queueConstants.getConstantWithPrefix(QueueConstants.JOB_REPLIES_QUEUE);
        AmqpOutboundChannelAdapterSpec messageHandlerSpec = Amqp.outboundAdapter(defaultRabbitTemplate).routingKey(queueName);
        return IntegrationFlow.from(channel).handle(messageHandlerSpec).get();
    }
}

这是JobManager配置

@Configuration
@Profile("manager")
@EnableBatchIntegration
@AllArgsConstructor
public class JobManagerPartitionConfiguration {

    private final JobRepository jobRepository;
    private final RemotePartitioningManagerStepBuilderFactory managerStepBuilderFactory;
    private PlatformTransactionManager transactionManager;
    private DeleteDataTasklet deleteDataTasklet;
    private InstDataLoader instDataLoader;
    private final ApplicationProperties appProperties;
    private final DirectChannel firstRequests;
    private final DirectChannel firstReplies;
    private final DirectChannel secondRequests;
    private final DirectChannel secondReplies;
    private final ManagerJobListener managerJobListener;
    private final IdBoundaryPartitioner idBoundaryPartitioner;
    private final ContextService contextService;

    @Bean
    public Step firstJobManagerStep() {
        return managerStepBuilderFactory.get("firstJobManagerStep")
                .partitioner("remotefirstJobAsStep", idBoundaryPartitioner)
                .gridSize(appProperties.getJobParameters().getGridSize())
                .outputChannel(firstRequests)
                .inputChannel(firstReplies)
                .listener(new SyncStepContextWithJob())
                .build();
    }

    @Bean
    public Step secondJobManagerStep() {
        return managerStepBuilderFactory.get("secondJobManagerStep")
                .partitioner("remoteSecondJobAsStep", idBoundaryPartitioner)
                .gridSize(appProperties.getJobParameters().getGridSize())
                .outputChannel(secondRequests)
                .inputChannel(secondReplies)
                .listener(new SyncStepContextWithJob())
                .build();
    }

    @Bean
    public Job secondJob(Step secondJobManagerStep) {
        return new JobBuilder("secondJob", jobRepository).incrementer(new RunIdIncrementer())
                .start(instDataLoaderStep())
                .next(deleteTable())
                .next(secodJobManagerStep)
                .listener(contextService)
                .listener(managerJobListener)
                .build();
    }

    @Bean
    public Job firstJob(Step firstJobManagerStep) {
        return new JobBuilder("firstJob", jobRepository).incrementer(new RunIdIncrementer())
                .start(instDataLoaderStep())
                .next(deleteTable())
                .next(firstJobManagerStep)
                .listener(contextService)
                .listener(managerJobListener)
                .build();
    }

这是我的 JobWorker 配置:

@Configuration
@Profile("worker")
@EnableBatchIntegration
@AllArgsConstructor
@Slf4j
public class JobWorkerPartitionConfiguration {

    private RemotePartitioningWorkerStepBuilderFactory workerStepBuilderFactory;
    private JobRepository jobRepository;
    private JobCache<I9Data[]> cache;
    private CacheJobListener<I9Data[]> jobListener;
    private WorkerJobListener workerJobListener;
    private StepMonitoringListener monitoringListener;
    private ApplicationProperties appProperties;
    private InstDataLoader instDataLoader;
    private ContractLoader contractLoader;
    private CacheItemWriter cacheItemWriter;
    private PlatformTransactionManager transactionManager;
    private InstDataJpaCache instDataJpaCache;
    private ContextService contextService;
    private MonitorService monitorService;


    @Bean
    @StepScope
    Job remoteFirstJob(NamedParameterJdbcTemplate jdbcTemplate) {
        return new JobBuilder("remoteFirstJob", jobRepository).start(instDataLoaderStep())
                .next(contractLoaderTaskletStep())
                .next(calculateStep())
                .listener(contextService)
                .listener(jobListener)
                .listener(workerJobListener)
                .listener(monitoringListener)
                // .listener(new SyncStepContextWithJob(this.monitorService))
                .build();
    }

    

    @Bean
    public Step remoteFirstJobAsStep(
            DirectChannel firstRequests,
            DirectChannel firstReplies

    ) {
        return workerStepBuilderFactory.get("remoteFirstJobAsStep")
                .inputChannel(firstRequests)
                .outputChannel(firstReplies)
                .parametersExtractor(remoteJobParametersExtractor())
                .listener(new SyncStepContextWithJob())
                .build();
    }

问题就是为什么当我开始第一份工作时会遇到这样的异常。它不应该关心第二个回复,因为我在 JobManagerPartitionConfiguration 类中的示例为第一个作业定义了 inputChannel =“firstReplies”和 outputChannel =“firstRequests”,这意味着它应该使用这些通道而不是第二个通道配置。

java spring spring-batch spring-integration
1个回答
0
投票

您的堆栈跟踪清楚地表明该通道上的问题来自通过 AMQP 入站通道适配器使用的消息:

at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:262)
at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter.access$200(AmqpInboundChannelAdapter.java:69)
at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter$Listener.createAndSend(AmqpInboundChannelAdapter.java:397)
at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter$Listener.onMessage(AmqpInboundChannelAdapter.java:360)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1663)

是的,我看到你确实喜欢这样:

@Bean("secondManagerInBoundFlow")
@Profile("manager")
public IntegrationFlow secondManagerInBoundFlow() {
    return queueConfig.getInboundAdapter(true, secondReplies());
}

但是显然仍然有一些活动的

manager
配置文件,这使得这个通道适配器被启动,但是在这个管理器配置文件中你可能没有这个通道的消费者。我们确实可以在您的代码中看到:

@Bean("secondWorkerOutboundFlow")
@Profile("worker")
public IntegrationFlow secondWorkerOutboundFlow() {
    return queueConfig.getOutboundAdapter(false, secondReplies());
}

哪一个看起来像

worker
个人资料。

我无法回答你的问题,因为我还无法理解其中的逻辑。

© www.soinside.com 2019 - 2024. All rights reserved.