[rxjava计时器在成功执行订阅后引发超时异常

问题描述 投票:2回答:2

我有以下代码:

repo.getObservable()
            .timeout(1, TimeUnit.MINUTES)
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .doOnSubscribe {
                _isInProgress.value = true
            }
            .doFinally {
                _isInProgress.value = false
            }
            .subscribe(
                    {
                        Timber.d("Success")
                    },
                    {

                        Timber.e(it)
                    })
            .trackDisposable()

问题是几秒钟后我成功获得了成功消息,但是我的预加载器仍然等待1分钟,然后我的订阅错误部分被执行。那是预期的行为吗?如果成功执行了订阅的成功部分,我该如何停止超时?

P。 S.从getObservable()返回的Observable的创建方式如下:PublishSubject.create()

android rx-java rx-java2
2个回答
2
投票
这必须是因为您没有在PublishSubject上调用onComplete,而仅调用了onNext。参见以下示例:

PublishSubject<Integer> source = PublishSubject.create(); // It will get 1, 2, 3, 4 and onComplete source.subscribe(getFirstObserver()); source.onNext(1); source.onNext(2); source.onNext(3); // It will get 4 and onComplete for second observer also. source.subscribe(getSecondObserver()); source.onNext(4); source.onComplete();

直到onComplete被称为观察者正在等待更多结果。您可以在收到等待的结果后单击unsubscribe/dispose,或者在所有结果发送完毕后在Observable上调用onComplete

0
投票
如果需要一个结果,请在take(1)之前或之后使用timeout

repo.getObservable() .take(1) // <--------------------------------------------- .timeout(1, TimeUnit.MINUTES) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .doOnSubscribe { _isInProgress.value = true } .doFinally { _isInProgress.value = false } .subscribe( { Timber.d("Success") }, { Timber.e(it) }) .trackDisposable()

© www.soinside.com 2019 - 2024. All rights reserved.