Spring Integration Java DSL 流程中的通知器(即发即弃)模式

问题描述 投票:0回答:1

我是第一次使用 Spring Integration,所以我学得很快。

我有一个工作场景,其中我必须激活多个子流,但我不能中断主流或使主流以子流的结果为条件

我制作了一个图表,与我必须做的几乎相似

  • 源只是一个网关对象
  • 我转换一些 XML 并创建一个 DTO
  • 我根据某些系统条件进行过滤(如果过滤失败,是的,消息必须中止)
  • 我需要激活反馈子流程
    • 仅部分留言可以反馈
    • 创建新的反馈消息
    • 将其序列化(有 XML 部分)
    • 发送到通道“feedbackChannel”,即 JMS 到 AMQP
  • 之前的流程必须始终继续第 3 步中的原始消息
  • 消息已序列化
  • 消息已使用 HMAC 签名
  • 激活一个新的子流,需要记录当前时间以用于超时目的(是的,子流太过分了)
    • 提取相关信息,例如消息ID
    • 将此信息发送给消息消费者
    • 到此结束,无需回复任何人
  • 主流程使用来自 HMAC 步骤的原始消息恢复
  • 通过“mainChannel”(即 JMS)发送到 IBM MQ 服务器

我使用 Java DSL 创建了流定义。由于我无法从虚拟桌面复制/粘贴,所以我会用通用语法写下


@Bean
public IntegrationFlow mainFlow(... autowired handlers...) {
    return IntegrationFlow.from(MessageChannels.direct())
    .log(...)
    .transform(transformer::transform)
    .filter(filter::filter)
    .enrichHeaders(....) //yes, not documented in the diagram 😜
    .transform(...)
    .gateway(sendFeedback, g->g.requiresReply(false))
    ......
    .channel("mainChannel")
    .get();
}

@Bean
protected IntegrationFlow sendFeedbackFlow(...) {
    return IntegrationFlow.from(MessageChannels.queue())
    .filter(...)
    .handle(...)
    .channel("feedbackChannel")
    .get();
}


sendFeedback
流只会转发那些符合特定条件的消息,这是设计使然。
.gateway
.wireTap
(窃听听起来很棒,也是我第一次尝试)的问题是,如果消息被子流过滤掉,那么就意味着主流将不会继续。实际上就像命令式编程中的子例程调用。

我需要的是一个流程通知另一个流程并继续无论如何。

我提供的唯一解决方案如下,但我希望有一个更干净的 DSL

.handle((payload,headers) -> {
    sendFeedbackFlow.getInputChannel().send(MessageBuilder.withPayload(payload).copyHeaders(headers).build();
    return payload; //this is dirty for me!
})

我已经明确告诉我的流程继续使用原始有效负载,这并不是什么坏事。我相信 Spring 缺少使用

void
返回类型的消息的处理程序。

问题是:每当我必须通知(并忘记)有关消息的辅助流或网关组件、服务、通道等并使用上一条消息继续我的流时,正确的语法应该是什么使用?

spring-integration-dsl
1个回答
0
投票

我想你正在寻找的是

.publishSubscribeChannel(publishSubscribeSpec -> publishSubscribeSpec
                    .subscribe(flow -> flow.channel(Objects.requireNonNull(sendFeedbackFlow().getInputChannel())))
                    .subscribe(flow -> flow.channel(Objects.requireNonNull(nextStepInYourMainFlow.getInputChannel()))))
© www.soinside.com 2019 - 2024. All rights reserved.