我是第一次使用 Spring Integration,所以我学得很快。
我有一个工作场景,其中我必须激活多个子流,但我不能中断主流或使主流以子流的结果为条件
我制作了一个图表,与我必须做的几乎相似
我使用 Java DSL 创建了流定义。由于我无法从虚拟桌面复制/粘贴,所以我会用通用语法写下
@Bean
public IntegrationFlow mainFlow(... autowired handlers...) {
return IntegrationFlow.from(MessageChannels.direct())
.log(...)
.transform(transformer::transform)
.filter(filter::filter)
.enrichHeaders(....) //yes, not documented in the diagram 😜
.transform(...)
.gateway(sendFeedback, g->g.requiresReply(false))
......
.channel("mainChannel")
.get();
}
@Bean
protected IntegrationFlow sendFeedbackFlow(...) {
return IntegrationFlow.from(MessageChannels.queue())
.filter(...)
.handle(...)
.channel("feedbackChannel")
.get();
}
sendFeedback
流只会转发那些符合特定条件的消息,这是设计使然。 .gateway
和.wireTap
(窃听听起来很棒,也是我第一次尝试)的问题是,如果消息被子流过滤掉,那么就意味着主流将不会继续。实际上就像命令式编程中的子例程调用。
我需要的是一个流程通知另一个流程并继续无论如何。
我提供的唯一解决方案如下,但我希望有一个更干净的 DSL
.handle((payload,headers) -> {
sendFeedbackFlow.getInputChannel().send(MessageBuilder.withPayload(payload).copyHeaders(headers).build();
return payload; //this is dirty for me!
})
我已经明确告诉我的流程继续使用原始有效负载,这并不是什么坏事。我相信 Spring 缺少使用
void
返回类型的消息的处理程序。
问题是:每当我必须通知(并忘记)有关消息的辅助流或网关组件、服务、通道等并使用上一条消息继续我的流时,正确的语法应该是什么使用?
我想你正在寻找的是
.publishSubscribeChannel(publishSubscribeSpec -> publishSubscribeSpec
.subscribe(flow -> flow.channel(Objects.requireNonNull(sendFeedbackFlow().getInputChannel())))
.subscribe(flow -> flow.channel(Objects.requireNonNull(nextStepInYourMainFlow.getInputChannel()))))