如何连续执行多个RxJava2流量

问题描述 投票:1回答:1

我正在自我介绍RxJava2,但我感觉自己做错了。就我而言,我想执行以下一些异步操作。

在此示例中,第一个操作是检查设备是否已连接(wifi或数据,让我们承认这需要花费时间),然后我想连接到api,然后我要进行http调用以获取列表(可观察),然后使用它。如果这些操作之一失败,则应引发onError或异常并在订阅中处理。

我有这个代码可以工作:

Single.create((SingleEmitter<Boolean> e) -> e.onSuccess(Connectivity.isDeviceConnected(MainActivity.this)) )
    .subscribeOn(Schedulers.io())
    .flatMap(isDeviceConnected -> {
        Log.i("LOG", "isDeviceConnected : "+ isDeviceConnected);
        if(!isDeviceConnected)
            throw new Exception("whatever"); // TODO : Chercher vrai erreur

        return awRepository.getFluxAuthenticate(host, port, user, password); // Single<DisfeApiAirWatch>
    })
    .toObservable()
    .flatMap(awRepository::getFluxManagedApps)  // List of apps : Observable<AirwatchApp>

    .observeOn(AndroidSchedulers.mainThread())
    .doFinally(this::hideProgressDialog)
    .subscribe(
            app -> Log.i("LOG", "OnNext : "+ app),
            error -> Log.i("LOG", "Error : " + error),
            () -> Log.i("LOG", "Complete : ")
);

但是一个为简单的“ if”表示布尔值的人听起来是错误的。 Completable似乎更合乎逻辑(工作与否,继续或停止)。我尝试使用以下代码,但不起作用。

Completable.create((CompletableEmitter e) -> {
    if(Connectivity.isDeviceConnected(MainActivity.this))
        e.onComplete(); // Guess not good, should call the complete of subscribe ?
    else
        e.onError(new Exception("whatever"));
} ).toObservable()
    .subscribeOn(Schedulers.io())
    .flatMap(awRepository.getFluxAuthenticate(host, port, user, password)) //Single<DisfeApiAirWatch>
    .toObservable()
    .flatMap(awRepository::getFluxManagedApps) // List of apps : Observable<AirwatchApp>

    .observeOn(AndroidSchedulers.mainThread())
    .doFinally(this::hideProgressDialog)
    .subscribe(
            app -> Log.i("LOG", "OnNext : "+ app),
            error -> Log.i("LOG", "Error : " + error),
            () -> Log.i("LOG", "Complete : ")
);

如何使此代码工作?

我知道我可以先对可编译对象进行订阅,并在此对象的“ onSuccess”中编写另一个通量/其余代码。但是我不认为彼此之间的堆栈流是一个好的解决方案。

最诚挚的问候

android asynchronous rx-java2
1个回答
0
投票
Completable.create((CompletableEmitter e) -> { if(Connectivity.isDeviceConnected(MainActivity.this)) e.onComplete(); else e.onError(new Exception("whatever")); }) .subscribeOn(Schedulers.io()) .andThen(awRepository.getFluxAuthenticate(host, port, user, password)) // <----------- .flatMapObservable(awRepository::getFluxManagedApps) .observeOn(AndroidSchedulers.mainThread()) .doFinally(this::hideProgressDialog) .subscribe( app -> Log.i("LOG", "OnNext : "+ app), error -> Log.i("LOG", "Error : " + error), () -> Log.i("LOG", "Complete : ")

);

© www.soinside.com 2019 - 2024. All rights reserved.