我想为以下流程创建IntegrationFlow流程。
@Bean
public IntegrationFlow buildCart() {
return f -> f.handle(validate, "buildPreCheck")
.handle(preProcessProcessor)
.handle(getOffersProcessor)
.handle(buildItems)
**.wireTap(log())**
.handle(validateItems)
.handle(deliver);
}
编辑:
嗨Artem,我添加了Wire Tap,如下面的代码所示。它仍然将WireTap节点排除为Sequencal并等待该节点。
请帮助使其成为异步节点。
@Bean
public IntegrationFlow log() {
return f -> f.handle(auditProcessor).channel("nullChannel");
}
@ServiceActivator
@Description("Call and get the Offers Request")
public void getDetails(Message<Context> message) throws InterruptedException {
log.info("getDetails-Sleep-Start");
Thread.sleep(3000);
log.info("getDetails-Sleep-End");
}
使用Spring Integration Java DSL,我们都错过了Spring Integration中一个非常重要的组件是MessageChannel
。事实是,只要我们需要超过默认的DirectChannel
,就可以将通道添加到流程中。对于异步执行,我们有一个ExecutorChannel
。但是在我们进行分叉流程之前,我们需要以某种方式跳到那里而不会破坏主要的流程。就EIP而言,这称为Wire-Tap:https://www.enterpriseintegrationpatterns.com/patterns/messaging/WireTap.html。
Spring Integration Java DSL在流程中提出了类似.wireTap()
运算符的实现。审计逻辑可以在抽头子流程中或通过渠道实现,但不要忘记ExecutorChannel
!
您可以在参考手册中查看更多信息:https://docs.spring.io/spring-integration/reference/html/java-dsl.html#java-dsl-wiretap
UPDATE
该
.handle(buildItems)
.wireTap(log())
是正确的方法:你将审核buildItems
的结果,然后继续下一步。
必须像这样修改log()
:
@Bean
public IntegrationFlow log() {
return f -> f.channel(c -> c.executor(taskExecutorBean())).handle(auditProcessor).channel("nullChannel");
}
注意c.executor()
。这样我们就为我们的log()
子流添加了一个异步手。