启动spring integration channel adapter,等待流程完成

问题描述 投票:0回答:1

我有一个 spring boot 应用程序,并且有一个要求,我需要轮询数据库以获取某些数据,并在从 db 接收到该数据后,轮询另一个系统以获取更多数据。我正在使用 spring 集成并定义了以下组件 -

频道:

  1. 偶数频道
  2. 控制频道
  3. 系统频道
  4. taskStatusChannel (QueueChannel)
  5. 丢弃频道

适配器和服务激活器:

    @Bean
    @InboundChannelAdapter(channel = "eventChannel", poller = @Poller(fixedDelay = "1000", taskExecutor = "executor"),
            autoStartup = "false")
    @EndpointId("jdbcPollingAdapter")
    public JdbcPollingChannelAdapter pollingChannelAdapter() {
        JdbcPollingChannelAdapter adapter = new JdbcPollingChannelAdapter(this.dataSource, SQL);
        adapter.setRowMapper((rs, index) -> new TestEvent(rs.getString("eventName"), rs.getString("eventStatus"),
                rs.getTimestamp("createdDate"), rs.getTimestamp("updatedDate")));

        return adapter;
    }

    @Bean
    @InboundChannelAdapter(channel = "graphQLChannel", poller = @Poller(fixedRate = "1000"), autoStartup = "false")
    @EndpointId("graphQLPollingAdapter")
    public GraphQLPollingService graphQLPollingService() {
        return new GraphQLPollingService();
    }

    @Bean
    @ServiceActivator(inputChannel = "controlChannel")
    public ExpressionControlBusFactoryBean controlBus() {
        return new ExpressionControlBusFactoryBean();
    }

集成流程:

    @Bean
    public IntegrationFlow flow1() {
        return IntegrationFlow.from(eventChannel)
                              .filter(eventFilter)
                              .handle(msg -> {
                                  trigger("@jdbcPollingAdapter.stop()");
                                  trigger("@graphQLPollingAdapter.start()");
                              })
                              .get();

    }

    private void trigger(String expression) {
        controlChannel.send(new GenericMessage<>(expression));
    }

    @Bean
    public IntegrationFlow discardFlow() {
        return IntegrationFlow.from(discardChannel)
                              .handle(msg -> log.info("Discarded msg: {}", msg))
                              .get();
    }

    @Bean
    public IntegrationFlow graphQLFlow() {
        return IntegrationFlow.from(graphQLChannel)
                              .filter(graphQLFilter)
                              .handle(msg -> {
                                  trigger("@graphQLPollingAdapter.stop()");
                                  taskStatusChannel.send(new GenericMessage<>("COMPLETED"));
                              })
                              .get();
    }

我通过 REST 调用启动 JDBC 轮询适配器,但调用立即返回。我希望调用等待所有流程完成然后返回。

总而言之,我想要这样的东西-

1. REST call to RestController 
2. RestController (start jdbc polling adapter and wait) 
3. when data received [1. stop jdbc polling adapter 2. start systemA polling adapter] 
4. when systemA data received [1. stop systemA polling adapter] 
5. RestController returns 200 OK

如何实现? 谢谢你的回答!

java spring-boot spring-integration
1个回答
0
投票

是的......这不是应该使用入站通道适配器的方式。这听起来更像是您通过 REST API 触发了一些操作,然后您需要进行一些处理并返回回复。因此,您需要从

Http.inboundGateway()
开始,然后是
handle()
JdbcOutboundGateway
,然后是
handle()
GraphQl.gateway()
就是这样:您将其结果返回给 HTTP 请求。 HTTP 可以替换为
WebFlux
以更好地处理来自
GraphQl.gateway()
.

的反应性回复

调用

stop()/start()
进行流量控制从业务逻辑上看是不正确的

© www.soinside.com 2019 - 2024. All rights reserved.