我有一个 spring boot 应用程序,并且有一个要求,我需要轮询数据库以获取某些数据,并在从 db 接收到该数据后,轮询另一个系统以获取更多数据。我正在使用 spring 集成并定义了以下组件 -
频道:
适配器和服务激活器:
@Bean
@InboundChannelAdapter(channel = "eventChannel", poller = @Poller(fixedDelay = "1000", taskExecutor = "executor"),
autoStartup = "false")
@EndpointId("jdbcPollingAdapter")
public JdbcPollingChannelAdapter pollingChannelAdapter() {
JdbcPollingChannelAdapter adapter = new JdbcPollingChannelAdapter(this.dataSource, SQL);
adapter.setRowMapper((rs, index) -> new TestEvent(rs.getString("eventName"), rs.getString("eventStatus"),
rs.getTimestamp("createdDate"), rs.getTimestamp("updatedDate")));
return adapter;
}
@Bean
@InboundChannelAdapter(channel = "graphQLChannel", poller = @Poller(fixedRate = "1000"), autoStartup = "false")
@EndpointId("graphQLPollingAdapter")
public GraphQLPollingService graphQLPollingService() {
return new GraphQLPollingService();
}
@Bean
@ServiceActivator(inputChannel = "controlChannel")
public ExpressionControlBusFactoryBean controlBus() {
return new ExpressionControlBusFactoryBean();
}
集成流程:
@Bean
public IntegrationFlow flow1() {
return IntegrationFlow.from(eventChannel)
.filter(eventFilter)
.handle(msg -> {
trigger("@jdbcPollingAdapter.stop()");
trigger("@graphQLPollingAdapter.start()");
})
.get();
}
private void trigger(String expression) {
controlChannel.send(new GenericMessage<>(expression));
}
@Bean
public IntegrationFlow discardFlow() {
return IntegrationFlow.from(discardChannel)
.handle(msg -> log.info("Discarded msg: {}", msg))
.get();
}
@Bean
public IntegrationFlow graphQLFlow() {
return IntegrationFlow.from(graphQLChannel)
.filter(graphQLFilter)
.handle(msg -> {
trigger("@graphQLPollingAdapter.stop()");
taskStatusChannel.send(new GenericMessage<>("COMPLETED"));
})
.get();
}
我通过 REST 调用启动 JDBC 轮询适配器,但调用立即返回。我希望调用等待所有流程完成然后返回。
总而言之,我想要这样的东西-
1. REST call to RestController
2. RestController (start jdbc polling adapter and wait)
3. when data received [1. stop jdbc polling adapter 2. start systemA polling adapter]
4. when systemA data received [1. stop systemA polling adapter]
5. RestController returns 200 OK
如何实现? 谢谢你的回答!
是的......这不是应该使用入站通道适配器的方式。这听起来更像是您通过 REST API 触发了一些操作,然后您需要进行一些处理并返回回复。因此,您需要从
Http.inboundGateway()
开始,然后是 handle()
为 JdbcOutboundGateway
,然后是 handle()
为 GraphQl.gateway()
就是这样:您将其结果返回给 HTTP 请求。 HTTP 可以替换为 WebFlux
以更好地处理来自 GraphQl.gateway()
. 的反应性回复
调用
stop()/start()
进行流量控制从业务逻辑上看是不正确的