groupby和具有无限流的过滤器之间的选择

问题描述 投票:0回答:1

我们在项目中使用spring webflux。在我们的项目中,我们通过websocket接收消息,然后根据消息的类型选择一条消息处理管道。与反应堆一样,可以使用多个运算符来完成任务,而我们面临选择使用groupby或组合使用滤镜和ConnectableFlux来实现相同目标的选择。下面是两种选择的详细说明

  • 过滤器我们使用Flux.publish()广播传入的流,然后将多个订阅者放在ConnectableFlux上。每个订户首先进行过滤以检查是否需要进一步处理流。添加所有订户后,我们在ConnectableFlux上调用connect。这是示例代码
 public Mono<Void> handle(WebSocketSession session) {
    ConnectableFlux<WebSocketMessage> connectable = session.receive().publish();
    connectable.filter(msg -> msg.getPayloadAsText().equals("sub1"))
        .subscribe((s) -> System.out.println("Subscriber1"));
    connectable.filter(msg -> msg.getPayloadAsText().equals("sub2"))
        .subscribe((s) -> System.out.println("Subscriber1"));
    connectable.connect();
    return connectable.then();
  }
  • groupby另一种选择是通过应用groupby运算符拆分传入的流,然后在平面图内进行过滤和订阅。这是示例代码
public Mono<Void> handle(WebSocketSession session) {
    return  session.receive().groupBy(msg -> {
      if (msg.getPayloadAsText().equals("sub1")) {
        return 1;
      } else if (msg.getPayloadAsText().equals("sub2")) {
        return 2;
      }
    }).flatMap(groupedFlux -> {
      if (groupedFlux.key() == 1) {
        groupedFlux.subscribe((s) -> System.out.println("Subscriber1"));
      } else if (groupedFlux.key() == 2) {
        groupedFlux.subscribe((s) -> System.out.println("Subscriber1"));
      }
      return groupedFlux;
    }).then();
  }

我们的困境是最适合我们目的的运营商。哪一位操作员会更出色,更重要的是,他们会选择哪种才是正确的用法。请提供您的意见。

java spring-webflux project-reactor reactor
1个回答
0
投票

我建议使用publish,因为flatMap渴望而publish尊重背压。

Flux#publish

准备共享此ConnectableFlux序列的Flux,并以感知背压的方式将值分发给订户。预取将默认为Queues.SMALL_BUFFER_SIZE

可以使用reactor.bufferSize.small属性调整预取。

Flux#flatMap

此运算符的三个维度可以与flatMapSequentialconcatMap进行比较:

  • 内部和订阅的生成: 此运算符急切地订阅其内部。
© www.soinside.com 2019 - 2024. All rights reserved.