RxJava2:如何在处置订阅者后避免InterruptibleException?

问题描述 投票:0回答:1

我有一个观察对象:

public Observable<List<Conversation>> getConversationListObservable() {
    return Observable.create(emitter -> {
        List<Conversation> conversations = networkApi.getConversations();
        for (Conversation conversation : conversations) {
            if (emitter.isDisposed()) return;  // this will cause subscription to terminate.
            List<User> users = networkApi.getUserList(conversation.getId());
            conversation.setUsers(users);
        }
        emitter.onNext(conversations);
        emitter.onComplete();
    });
}

androidx.lifecycle.ViewModel类中使用的:

public class ConversationViewModel extends ViewModel {
    private CompositeDisposable disposables = new CompositeDisposable();
    ....
    public void fetchConversationList(){
        disposables.add(repository.getConversationListObservable()
                .observeOn(AndroidSchedulers.mainThread())
                .subscribeOn(Schedulers.io())
                .subscribe(this::setConversations, this::onError));
    }
}

[当我离开带有Conversation列表的屏幕时,将放置此可观察的对象,但是我在logcat类中的RxJavaPlugins.setErrorHandler中从Application中发出警告:

W/RxJavaPlugins.setErrorHandler - Undeliverable exception received, not sure what to do: java.lang.InterruptedException

networkApi.getUserList通话中。好像当我进行此网络呼叫时,我的订户并没有在网络呼叫开始时就被释放,而在我获得对呼叫的响应时已经被释放。除了从InterruptedException类中删除此插件之外,是否有其他方法无法在RxJavaPlugins.setErrorHandler中获得Application

P.S。:堆栈跟踪如下:

2019-11-24 23:28:00.152 18051-18343/com.example W/RxJavaPlugins.setErrorHandler - Undeliverable exception received, not sure what to do: java.lang.InterruptedException
    at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedNanos(AbstractQueuedSynchronizer.java:1036)
    at java.util.concurrent.locks.AbstractQueuedSynchronizer.tryAcquireSharedNanos(AbstractQueuedSynchronizer.java:1327)
    at scala.concurrent.impl.Promise$DefaultPromise.tryAwait(Promise.scala:212)
    at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:222)
    at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:227)
    at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:190)
    at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
    at scala.concurrent.Await$.result(package.scala:190)
    at com.example.client.Client.sendRequest(Client.scala:61)
    at com.example.ui.Repository.provideClient(Repository.java:205)
    at com.example.ui.Repository.fetchUser(Repository.java:981)
    at com.example.ui.Repository.fetchUserWithRole(Repository.java:987)
    at com.example.ui.Repository.access$2400(Repository.java:95)
    at com.example.ui.Repository$11.lambda$createCall$1$Repository$11(Repository.java:912)
    at com.example.ui.-$$Lambda$Repository$11$8ZJhKkqn7hg2E6f5A5NBX1EeUPY.subscribe(lambda)
    at io.reactivex.internal.operators.observable.ObservableCreate.subscribeActual(ObservableCreate.java:40)
    at io.reactivex.Observable.subscribe(Observable.java:12267)
    at io.reactivex.internal.operators.observable.ObservableOnErrorNext.subscribeActual(ObservableOnErrorNext.java:38)
    at io.reactivex.Observable.subscribe(Observable.java:12267)
    at io.reactivex.internal.operators.observable.ObservableSubscribeOn$SubscribeTask.run(ObservableSubscribeOn.java:96)
    at io.reactivex.Scheduler$DisposeTask.run(Scheduler.java:578)
    at io.reactivex.internal.schedulers.ScheduledRunnable.run(ScheduledRunnable.java:66)
    at io.reactivex.internal.schedulers.ScheduledRunnable.call(ScheduledRunnable.java:57)
    at java.util.concurrent.FutureTask.run(FutureTask.java:237)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:272)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1133)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:607)
    at java.lang.Thread.run(Thread.java:761)

这是来自真实代码的堆栈跟踪,但是我简化了当前问题的代码。此堆栈跟踪中的Client.sendRequest对应于简化示例中的networkApi.getUserList

android rx-java2 dispose rx-android interrupted-exception
1个回答
0
投票

[我不确定,但我只看到一个问题,if (emitter.isDisposed()) return;跳过了for loop中的代码,但是当放置发射器时,emitter.onNext()emitter.onComplete()仍然执行,您需要包装emitter.onNext()emitter.onComplete()] >

if (!emitter.isDisposed()){
        emitter.onNext(conversations);
        emitter.onComplete();
}

尽管我认为这个问题需要是另一个stacktrace错误,但您可以尝试。

© www.soinside.com 2019 - 2024. All rights reserved.