我有以下代码:
Single.create { emitter ->
// I/O thread here
ThirdPartySDK.doSomeAction {
// Main thread here
emitter.onSuccess(someValue)
}
}
.flatMap {
someOtherSingle(it) // Executes on main thread
}
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe({},{})
ThirdPartySDK.doSomeAction
回调在主线程上发布,因此,发射器也将在主线程上发出,而不是在订阅线程上发出(并且如果我在flatMap
中有进一步的网络交互,则链失败)。
如果我在第一个observeOn(Schedulers.io())
之后添加Single
,它将切换到正确的线程,但是有没有办法在正确的线程上发出信号?我无法修改ThirdPartySDK
行为。
observeOn
您已经说过,subscribeOn将仅在给定Scheduler-Thread上对subscribe调用subscribeActual方法调用。这并不意味着下游发射将在同一线程上。在您的情况下,onSuccess发出将从另一个线程(例如,数据库/ Http-ThreadPool等)中调用。
onSuccess将从未知线程(在您的情况下为主线程)中调用。下游调用将从主线程调用。因此,从主线程调用flatMap。 flatMap中的主线程上的网络调用可能会失败,因为不允许它“阻塞”主线程。
如何解决此问题?只需在Single#create之后放置一个observeOn。主线程调用onSucess。观察者-订阅者将从主线程中调用。 observeOn-subscriber将onSuccess下游调用(例如flatMap)重定向到给定的ObserveOn-Scheduler-Thread。因此,可以认为,flatMap是从非主循环线程调用的。
示例:
@Test
fun wurst() {
val thirdPartySDKImpl = ThirdPartySDKImpl()
Single.create<String> { emitter ->
thirdPartySDKImpl.doSomeAction {
emitter.onSuccess(it)
}
}
// .subscribeOn(Schedulers.computation())
// move emit from unknown thread to computation thread
.observeOn(Schedulers.computation())
// Single.just will be subscribe from a computation thread
.flatMap { Single.just(123) }
// move onSucess/ onError emit from computation thread to main-thread
.observeOn(AndroidSchedulers.mainThread())
// subscribe onNext / onError will be called from the main-android-thread
.subscribe({}, {})
}
interface ThirdPartySDK {
fun doSomeAction(callback: (v: String) -> Unit)
}
class ThirdPartySDKImpl : ThirdPartySDK {
override fun doSomeAction(callback: (v: String) -> Unit) {
// <- impl-detail ->
callback("whatever")
}
}
注意:如果create-lambda不会阻止或执行某些CPU繁重的工作,则不需要subscriptionOn。如果仅预订回调(将从另一个线程调用),则不需要subscribeOn。但是有什么方法可以在正确的线程上发出吗?您不应在运算符中使用任何并发。您可能会想,您可以做类似的事情:
Single.create<String> { emitter -> thirdPartySDKImpl.doSomeAction { Schedulers.io().scheduleDirect { emitter.onSuccess(it) } } }
但是不建议这样做,因为您可能会破坏序列化的onNext合同^ 1。此示例将确保onSucess下游调用将发生在预期的线程上,但是取消/取消订阅未得到处理,并且可能存在其他陷阱。[如果您有一个非反应性API,并且想要强制执行某些线程模型,我建议包装同步。具有异步API的API,并提供适当的observeOn / subscribeOn运算符。以后仅使用异步API。
interface ThirdPartySDKAsync { fun doSomeAction(): Single<String> } class ThirdPartySDKAsyncImpl(private val sdk: ThirdPartySDK, private val scheduler: Scheduler) : ThirdPartySDKAsync { override fun doSomeAction(): Single<String> { return Single.create<String> { emitter -> sdk.doSomeAction { emitter.onSuccess(it) } }.observeOn(scheduler) } }
进一步阅读:https://tomstechnicalblog.blogspot.com/2016/02/rxjava-understanding-observeon-and.html^ 1一次只允许一个线程调用onNext / onSuccess / onError / onComplete