似乎基于文档,Scheduler.trampoline确保元素发出先进先出(即按顺序)。似乎concat映射的目的是确保所有内容都适当地排列然后发出。因此,我想知道是否在应用subscribeOn ./。observeOn(Scheduler.trampoline())时,然后再执行concatmap运算符,而不是进行常规映射操作,是否有其意义。
是的,有一点。举个例子:
Observable.just(1, 2, 3, 4, 5)
.subscribeOn(Schedulers.trampoline())
.flatMap(
a -> {
if (a < 3) {
return Observable.just(a).delay(3, TimeUnit.SECONDS);
} else {
return Observable.just(a);
}
})
.doOnNext(
a -> System.out.println("Element: " + a + ", on: " + Thread.currentThread().getName()))
.subscribe();
这是输出:
Element: 3, on: main
Element: 4, on: main
Element: 5, on: main
Element: 1, on: RxComputationScheduler-1
Element: 2, on: RxComputationScheduler-2
这里发生的是1和2依次到达flatMap
运算符。但是现在,这些元素的内部流延迟了3秒。注意,flatMap
急于向内部流传递subscribes
。也就是说,它不等到onComplete
到下一个内部流(concatMap所做的工作)之前就完成一个流(subscribing
)。
因此1和2的内部流延迟了3秒。您可以说这是一个外部I / O调用,需要花费一些时间。同时,接下来的3个元素(3,4,5)输入flatMap
,它们的流立即结束。这就是为什么您在输出中看到保持顺序的原因。
然后3秒钟过去,并且发射元素1和2。注意,不能保证1会比2早。
现在用flatMap
替换concatMap
,您会看到顺序保持不变:
Element: 1, on: RxComputationScheduler-1 Element: 2, on: RxComputationScheduler-2 Element: 3, on: RxComputationScheduler-2 Element: 4, on: RxComputationScheduler-2 Element: 5, on: RxComputationScheduler-2
为什么?因为这就是
concatMap
的工作方式。元素1出现,并在I / O调用中使用。与其内部流相对应的内部流将发出onComplete
,这需要3秒钟。在第一个流发出onComplete之前,concatMap
不会订阅与其余元素相对应的内部流。这样做后,下一个流(Observable.just(2).delay(3, TimeUnit.SECONDS)
)就是subscribed
,依此类推。这样您就可以了解如何维护订单。关于这两个运算符,您需要记住的是:
flatMap
急于在元素到达时向内部流subscribes
传递。另一方面,concatMap
等待一个流完成,然后再将其subscribes
发送到下一个流。这就是为什么您无法使用concatMap
进行并行调用的原因。