如何为反应式 Spring Integration 流程编写过滤器?

问题描述 投票:0回答:1

我在我的集成流程中使用 RSocket 作为

InboundGateway
,使用交互模型
REQUEST_CHANNEL
。它发出
Flux
我的业务对象。

我可以使用 RSocket

OutboundGateways
“装饰”这些对象,但我无法在我的
IntegrationFlow
.

中添加过滤器或路由器步骤

GenericSelector
需要布尔值,所以我不能将它用于
Flux
recipientFlow
的情况类似。现在谷歌搜索也没有帮助。

是否有此用例的示例,或者我是否需要使用

split()
回退到对象?

感谢任何提示!

(我有大量对象,这就是为什么我想构建一个完全响应式流。)

spring-integration project-reactor spring-integration-dsl
1个回答
0
投票

很高兴看到您的一些代码,但听起来您来自

RSocketInboundGateway
的有效负载是
Flux
。因此,是的,在这种情况下,
split()
是一个不错的选择,可以将
Flux
中的每个项目作为单独的消息发送给下游处理。 这个
split()
的输出通道可以是一个
FluxMessageChannel
来让这个分离器结果的消费者成为一个反应者。

如果你不喜欢

split()
,你可以看看
transform()
Flux
有效载荷:

.<Flux<String>, Flux<String>>transform((flux) -> flux.map(String::toLowerCase))

所以,那么你可以使用

filter()
API 中的
Flux

route()
是一个 Spring Integration 操作,用于决定下一步将消息发送到哪里。有点分支。不知道
Flux
里面有没有类似的东西,但是如果你还需要根据
Flux
里面的items做路由,那么只有前面提到的
split()
可以帮到你了

© www.soinside.com 2019 - 2024. All rights reserved.