RxJava —在subscribeOn()线程上发出

问题描述 投票:0回答:1

我有以下代码:

Single.create { emitter ->
   // I/O thread here

   ThirdPartySDK.doSomeAction {
       // Main thread here

      emitter.onSuccess(someValue)
   }
}
.flatMap {
  someOtherSingle(it) // Executes on main thread
}
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe({},{})

ThirdPartySDK.doSomeAction回调在主线程上发布,因此,发射器也将在主线程上发出,而不是在订阅线程上发出(并且如果我在flatMap中有进一步的网络交互,则链失败)。

如果我在第一个observeOn(Schedulers.io())之后添加Single,它将切换到正确的线程,但是有没有办法在正确的线程上发出信号?我无法修改ThirdPartySDK行为。

android rx-java rx-java2
1个回答
0
投票
subscribeOn

subscribeActual lambda将在给定的调度程序上被调用

observeOn

将线程切换到给定的调度程序。每个上游onNext调用都将通过ObserveOn-Scheduler-Thread进行调用

您已经说过,subscribeOn将仅在给定Scheduler-Thread上对subscribe调用subscribeActual方法调用。这并不意味着下游发射将在同一线程上。在您的情况下,onSuccess发出将从另一个线程(例如,数据库/ Http-ThreadPool等)中调用。

onSuccess将从未知线程(在您的情况下为主线程)中调用。下游调用将从主线程调用。因此,从主线程调用flatMap。 flatMap中的主线程上的网络调用可能会失败,因为不允许它“阻塞”主线程。

如何解决此问题?只需在Single#create之后放置一个observeOn。主线程调用onSucess。观察者-订阅者将从主线程中调用。 observeOn-subscriber将onSuccess下游调用(例如flatMap)重定向到给定的ObserveOn-Scheduler-Thread。因此,可以认为,flatMap是从非主循环线程调用的。

示例:

@Test fun wurst() { val thirdPartySDKImpl = ThirdPartySDKImpl() Single.create<String> { emitter -> thirdPartySDKImpl.doSomeAction { emitter.onSuccess(it) } } // .subscribeOn(Schedulers.computation()) // move emit from unknown thread to computation thread .observeOn(Schedulers.computation()) // Single.just will be subscribe from a computation thread .flatMap { Single.just(123) } // move onSucess/ onError emit from computation thread to main-thread .observeOn(AndroidSchedulers.mainThread()) // subscribe onNext / onError will be called from the main-android-thread .subscribe({}, {}) } interface ThirdPartySDK { fun doSomeAction(callback: (v: String) -> Unit) } class ThirdPartySDKImpl : ThirdPartySDK { override fun doSomeAction(callback: (v: String) -> Unit) { // <- impl-detail -> callback("whatever") } }

注意:如果create-lambda不会阻止或执行某些CPU繁重的工作,则不需要subscriptionOn。如果仅预订回调(将从另一个线程调用),则不需要subscribeOn。

但是有什么方法可以在正确的线程上发出吗?

您不应在运算符中使用任何并发。您可能会想,您可以做类似的事情:

Single.create<String> { emitter -> thirdPartySDKImpl.doSomeAction { Schedulers.io().scheduleDirect { emitter.onSuccess(it) } } }

但是不建议这样做,因为您可能会破坏序列化的onNext合同^ 1。此示例将确保onSucess下游调用将发生在预期的线程上,但是取消/取消订阅未得到处理,并且可能存在其他陷阱。

[如果您有一个非反应性API,并且想要强制执行某些线程模型,我建议包装同步。具有异步API的API,并提供适当的observeOn / subscribeOn运算符。以后仅使用异步API。

interface ThirdPartySDKAsync { fun doSomeAction(): Single<String> } class ThirdPartySDKAsyncImpl(private val sdk: ThirdPartySDK, private val scheduler: Scheduler) : ThirdPartySDKAsync { override fun doSomeAction(): Single<String> { return Single.create<String> { emitter -> sdk.doSomeAction { emitter.onSuccess(it) } }.observeOn(scheduler) } }

进一步阅读:https://tomstechnicalblog.blogspot.com/2016/02/rxjava-understanding-observeon-and.html

^ 1一次只允许一个线程调用onNext / onSuccess / onError / onComplete

© www.soinside.com 2019 - 2024. All rights reserved.