从发行说明(https://spring.io/blog/2017/11/29/spring-integration-5-0-ga-available):
- Reactive Streams支持通过FluxMessageChannel,ReactiveStreamsConsumer和AbstractMessageHandler中的direct org.reactivestreams.Subscriber实现;
我对Reactor支持的理解是例如您可以从变换器/处理程序返回Mono / Flux,Spring Integration会在尊重背压的同时自动将其转换为Messages。不幸的是,我不能让它像那样工作,例如:
IntegrationFlows.from("input")
.handle((p, h) -> Flux.just(1, 2, 3))
.log("l1")
.channel("output")
.get();
仍使用FluxArray类型的有效负载记录一条消息,而不是使用整数有效负载记录三条消息。
2017-12-18 17:12:33.262 INFO 97471 --- [nio-8080-exec-1] l1 : GenericMessage [payload=FluxArray, headers={id=a9701681-9945-f953-8b72-df369c2982a3, timestamp=1513613553262}]
此外,根据此行为和新的文档中没有任何内容
AbstractMessageHandler中的FluxMessageChannel,ReactiveStreamsConsumer和direct org.reactivestreams.Subscriber实现
所以我的问题是,我是否理解正确实施的Reactor支持,哪里可以找到有关该主题的任何信息?
因为我们在这里发送消息并且消息对您从服务返回什么样的有效负载无关紧要,所有内容都只是包含在Message
中。您需要一个特殊组件来了解此有效负载。其中之一是Splitter
。这个确定你的有效载荷是一个Reactive Streams Publisher
并作为Flux
迭代。
另一个组件是WebFluxInboundEndpoint
,它本身支持这种有效载荷。
您的自定义Service Activator可能会将Flux
作为参数来处理。
但没有任何事情会发生Spring Integration支持Reactive类型,但如果没有最终用户首选项,它们不会进行处理。
顺便说一句,splitter
应该提供FluxMessageChannel
作为输出,以通过背压方式处理分裂的Flux
。
随意提出关于记录FluxMessageChannel
的JIRA。的确,我们错过了。 ReactiveStreamsConsumer
也需要更多的爱,我们有一些计划让5.1
改进Reactive Streams模型,我们会尝试使其更灵活,甚至可以选择默认打开它。但是今天没有什么可以承诺。