在rxjava2中完成异步请求之前调用Subscriber onNext

问题描述 投票:0回答:1

我使用RxJava2在MVP中实现了一个存储库模式

remote data source.Java

public Observable<List<A>> getAList(){
          return ApiService.
                getAList()
                .compose(RxUtils.applySchedulers())
                .doOnSubscribe(disposable -> Timber.d(..))
                .doOnError(throwable -> Timber.d(..))
                .doOnComplete(() -> {
                    Timber.d(..);
                });
      }

local data source.Java

public Observable<List<A>> getAList(){
    return mDbHelper .....from SQLBrite..
}

public void saveAList(List<<A> a){
    SQlBriteTransaction...
}

Repository.java(更新)

  @Inject
  public Repository(DownloadUtils downloadUtils){
   this.mDownloadUtils = downloadUtils;
   }

   @Override
   public Observable<List<A>> getAList(){
     return  mRemoteDataSource
            .getAList()
             .flatMapIterable(List<A> -> a)
            .flatMap(A a ->
      ************************************************************   
               return Observable.fromIterable(a.getB())
                     .flatMap((Function<B, ObservableSource<B>>) b ->
                                       Observable.create(emitter -> 
                                                emitter.onNext(new 
            DownloadUtils().downloadFiles(b,totalListCount,emitter))))
                                .toList()
                                .toObservable()

     ***************************************************
                    .toList()
                    .toObservable()
                    .doOnNext( List<A>  a -> {

     --------------Only the first change in B value is inserted in Db-
                       mLocalDataSource.saveAList(a);
                    });
    }

DownLoadUtils.java(更新)

void downloadBFiles(B b, int totalCount,ObservableEmitter<B> emitter){
        fileCount = b.size;
         b.get(index).setDataToChange(dataToChange);

         *** I am using PR Downloader for aynchronous download using 
            RECURSION **
        PRDownloader.download(remoteUrl, filePath, fileName)
                .build()
        .setOnStartOrResumeListener(() -> {
            })
            .setOnProgressListener(progress -> {
                int progressPercent = (int) (progress.currentBytes * 
                   100 / progress.totalBytes);,

             })
            .start(new OnDownloadListener() {
                @Override
                public void onDownloadComplete() {
  ********************* emitter.onComplete() ******************


             @Override
                public void onError(Error error) {
                  }   
     }

presenter.Java

void getVideosFromRepo(){

    disposable = mRepository
                 .getAList()
                 .doOnSubscribe(d _-> "Started Loading")
                 .subscribe(
                   //OnNext
    ------------- Here the OnNext is being called before Asynchronous Operation completes!!-------

                    List<A> a -> mView.setAList(a);
                   )


}

在演示者实现之上,即使在异步下载完成之前,也会在Presenter的onNext中返回List ...需要进行哪些更改,以便在所有下载完成后调用onNext(subscribe)。

android rx-java rx-java2 dagger-2 android-mvp
1个回答
0
投票

您正在使用RxJava观察者链之外的异步服务,因此RxJava无法管理传递的数据。由于downloadBFiles()使用一个单独的观察链,你可能会失去线程。

而不是使用doOnNext()来触发下载,您将需要使用flatMap(),以便下载结果包含在您的观察者链中。

© www.soinside.com 2019 - 2024. All rights reserved.