我们在项目中使用spring webflux。在我们的项目中,我们通过websocket接收消息,然后根据消息的类型选择一条消息处理管道。与反应堆一样,可以使用多个运算符来完成任务,而我们面临选择使用groupby或组合使用滤镜和ConnectableFlux来实现相同目标的选择。下面是两种选择的详细说明
public Mono<Void> handle(WebSocketSession session) {
ConnectableFlux<WebSocketMessage> connectable = session.receive().publish();
connectable.filter(msg -> msg.getPayloadAsText().equals("sub1"))
.subscribe((s) -> System.out.println("Subscriber1"));
connectable.filter(msg -> msg.getPayloadAsText().equals("sub2"))
.subscribe((s) -> System.out.println("Subscriber1"));
connectable.connect();
return connectable.then();
}
public Mono<Void> handle(WebSocketSession session) {
return session.receive().groupBy(msg -> {
if (msg.getPayloadAsText().equals("sub1")) {
return 1;
} else if (msg.getPayloadAsText().equals("sub2")) {
return 2;
}
}).flatMap(groupedFlux -> {
if (groupedFlux.key() == 1) {
groupedFlux.subscribe((s) -> System.out.println("Subscriber1"));
} else if (groupedFlux.key() == 2) {
groupedFlux.subscribe((s) -> System.out.println("Subscriber1"));
}
return groupedFlux;
}).then();
}
我们的困境是最适合我们目的的运营商。哪一位操作员会更出色,更重要的是,他们会选择哪种才是正确的用法。请提供您的意见。
我建议使用publish
,因为flatMap
渴望而publish
尊重背压。
Flux#publish
:
准备共享此
ConnectableFlux
序列的Flux
,并以感知背压的方式将值分发给订户。预取将默认为Queues.SMALL_BUFFER_SIZE
。
可以使用reactor.bufferSize.small
属性调整预取。
Flux#flatMap
:
此运算符的三个维度可以与
flatMapSequential
和concatMap
进行比较:
- 内部和订阅的生成: 此运算符急切地订阅其内部。