我在我的集成流程中使用 RSocket 作为
InboundGateway
,使用交互模型 REQUEST_CHANNEL
。它发出Flux
我的业务对象。
我可以使用 RSocket
OutboundGateways
“装饰”这些对象,但我无法在我的IntegrationFlow
.中添加过滤器或路由器步骤
GenericSelector
需要布尔值,所以我不能将它用于Flux
。 recipientFlow
的情况类似。现在谷歌搜索也没有帮助。
是否有此用例的示例,或者我是否需要使用
split()
回退到对象?
感谢任何提示!
(我有大量对象,这就是为什么我想构建一个完全响应式流。)
很高兴看到您的一些代码,但听起来您来自
RSocketInboundGateway
的有效负载是 Flux
。因此,是的,在这种情况下,split()
是一个不错的选择,可以将 Flux
中的每个项目作为单独的消息发送给下游处理。
这个 split()
的输出通道可以是一个 FluxMessageChannel
来让这个分离器结果的消费者成为一个反应者。
如果你不喜欢
split()
,你可以看看transform()
对Flux
有效载荷:
.<Flux<String>, Flux<String>>transform((flux) -> flux.map(String::toLowerCase))
所以,那么你可以使用
filter()
API 中的 Flux
。
route()
是一个 Spring Integration 操作,用于决定下一步将消息发送到哪里。有点分支。不知道Flux
里面有没有类似的东西,但是如果你还需要根据Flux
里面的items做路由,那么只有前面提到的split()
可以帮到你了