@Override
public void onPause() {
super.onPause();
compositeDisposable.clear();
}
@Override
public void onResume() {
super.onResume();
Flowable<MyData> distanceFlowable = myDataProcessor.hide().onBackpressureLatest()
.distinctUntilChanged()
.share();
compositeDisposable.add(distanceFlowable)
.observeOn(AndroidSchedulers.mainThread())
.subscribe(data -> {Log.w("observer 1", data.value)});
compositeDisposable.add(distanceFlowable)
.observeOn(AndroidSchedulers.mainThread())
.subscribe(data -> {Log.w("observer 2", data.value)});
}
当distanceFlowable连续获得新更改时,该代码运行良好。但是,在distanceFlowable上只有一个帖子的情况下,只有observer 1会收到通知。
它的行为如下:
我希望它表现为:
我尝试使用ConnectedFlowable
代替publish()
,然后在同时订阅了[[observer 1和observer 2之后将connect()
改为可流动对象。但是,即使清除了compositeDisposable
并且没有人在听,它仍然在可流动状态下发布。
flowable.connect()
返回我可以添加到Disposable
的compositeDisposable
,这样最终在onPause中将其清除了。