我使用RxJava2在MVP中实现了一个存储库模式
remote data source.Java
public Observable<List<A>> getAList(){
return ApiService.
getAList()
.compose(RxUtils.applySchedulers())
.doOnSubscribe(disposable -> Timber.d(..))
.doOnError(throwable -> Timber.d(..))
.doOnComplete(() -> {
Timber.d(..);
});
}
local data source.Java
public Observable<List<A>> getAList(){
return mDbHelper .....from SQLBrite..
}
public void saveAList(List<<A> a){
SQlBriteTransaction...
}
Repository.java(更新)
@Inject
public Repository(DownloadUtils downloadUtils){
this.mDownloadUtils = downloadUtils;
}
@Override
public Observable<List<A>> getAList(){
return mRemoteDataSource
.getAList()
.flatMapIterable(List<A> -> a)
.flatMap(A a ->
************************************************************
return Observable.fromIterable(a.getB())
.flatMap((Function<B, ObservableSource<B>>) b ->
Observable.create(emitter ->
emitter.onNext(new
DownloadUtils().downloadFiles(b,totalListCount,emitter))))
.toList()
.toObservable()
***************************************************
.toList()
.toObservable()
.doOnNext( List<A> a -> {
--------------Only the first change in B value is inserted in Db-
mLocalDataSource.saveAList(a);
});
}
DownLoadUtils.java(更新)
void downloadBFiles(B b, int totalCount,ObservableEmitter<B> emitter){
fileCount = b.size;
b.get(index).setDataToChange(dataToChange);
*** I am using PR Downloader for aynchronous download using
RECURSION **
PRDownloader.download(remoteUrl, filePath, fileName)
.build()
.setOnStartOrResumeListener(() -> {
})
.setOnProgressListener(progress -> {
int progressPercent = (int) (progress.currentBytes *
100 / progress.totalBytes);,
})
.start(new OnDownloadListener() {
@Override
public void onDownloadComplete() {
********************* emitter.onComplete() ******************
@Override
public void onError(Error error) {
}
}
presenter.Java
void getVideosFromRepo(){
disposable = mRepository
.getAList()
.doOnSubscribe(d _-> "Started Loading")
.subscribe(
//OnNext
------------- Here the OnNext is being called before Asynchronous Operation completes!!-------
List<A> a -> mView.setAList(a);
)
}
在演示者实现之上,即使在异步下载完成之前,也会在Presenter的onNext中返回List ...需要进行哪些更改,以便在所有下载完成后调用onNext(subscribe)。
您正在使用RxJava观察者链之外的异步服务,因此RxJava无法管理传递的数据。由于downloadBFiles()
使用一个单独的观察链,你可能会失去线程。
而不是使用doOnNext()
来触发下载,您将需要使用flatMap()
,以便下载结果包含在您的观察者链中。