Rxjava Scheduler.trampoline与concatmap

问题描述 投票:0回答:1

似乎基于文档,Scheduler.trampoline确保元素发出先进先出(即按顺序)。似乎concat映射的目的是确保所有内容都适当地排列然后发出。因此,我想知道是否在应用subscribeOn ./。observeOn(Scheduler.trampoline())时,然后再执行concatmap运算符,而不是进行常规映射操作,是否有其意义。

rx-java2
1个回答
0
投票

是的,有一点。举个例子:

Observable.just(1, 2, 3, 4, 5)
    .subscribeOn(Schedulers.trampoline())
    .flatMap(
        a -> {
          if (a < 3) {
            return Observable.just(a).delay(3, TimeUnit.SECONDS);
          } else {
            return Observable.just(a);
          }
        })
    .doOnNext(
        a -> System.out.println("Element: " + a + ", on: " + Thread.currentThread().getName()))
    .subscribe();

这是输出:

Element: 3, on: main
Element: 4, on: main
Element: 5, on: main
Element: 1, on: RxComputationScheduler-1
Element: 2, on: RxComputationScheduler-2

这里发生的是1和2依次到达flatMap运算符。但是现在,这些元素的内部流延迟了3秒。注意,flatMap急于向内部流传递subscribes。也就是说,它不等到onComplete到下一个内部流(concatMap所做的工作)之前就完成一个流(subscribing)。

因此1和2的内部流延迟了3秒。您可以说这是一个外部I / O调用,需要花费一些时间。同时,接下来的3个元素(3,4,5)输入flatMap,它们的流立即结束。这就是为什么您在输出中看到保持顺序的原因。

然后3秒钟过去,并且发射元素1和2。注意,不能保证1会比2早。

现在用flatMap替换concatMap,您会看到顺序保持不变:

Element: 1, on: RxComputationScheduler-1
Element: 2, on: RxComputationScheduler-2
Element: 3, on: RxComputationScheduler-2
Element: 4, on: RxComputationScheduler-2
Element: 5, on: RxComputationScheduler-2

为什么?因为这就是concatMap的工作方式。元素1出现,并在I / O调用中使用。与其内部流相对应的内部流将发出onComplete,这需要3秒钟。在第一个流发出onComplete之前,concatMap不会订阅与其余元素相对应的内部流。这样做后,下一个流(Observable.just(2).delay(3, TimeUnit.SECONDS))就是subscribed,依此类推。这样您就可以了解如何维护订单。

关于这两个运算符,您需要记住的是:flatMap急于在元素到达时向内部流subscribes传递。另一方面,concatMap等待一个流完成,然后再将其subscribes发送到下一个流。这就是为什么您无法使用concatMap进行并行调用的原因。

© www.soinside.com 2019 - 2024. All rights reserved.