RxJava:跳过fromIterable()中的所有错误,并在发出所有项目时通知订户-Flowable.parallel执行

问题描述 投票:0回答:1

我有一个API调用,可根据“ Id”验证某些状态。 API返回单一或错误。我有此类ID的列表,只有一个ID有效返回成功,否则不返回(所有ID的返回错误)。我需要的是,遍历每个Id并跳过API调用中的错误,直到列表成功或结束。我能够按顺序实现这一目标。但是,我正在尝试使用ParallelFlowable做同样的事情。当一个ID返回成功时,它可以正常工作;但是,当没有一个返回成功的ID(所有ID都失败)时,它只是跳过API中的所有错误,但在所有ID都经过验证后不通知订户。我不确定如何处理。

// API call
fun getStatus(Id: String): Single<String> {
  //... returns Single<String> or error
}

//Sequential flow, Working
fun getStatus(ids: List<String>): Single<String> {
  Observable.fromIterable(ids)
                .flatMapSingle { id ->
                    getStatus(id)
                        .onErrorResumeWith { singleSource ->
                            if (ids.last() == id)) { //If this is last item in list, return error
                                singleSource.onError(NoStatusFoundException())
                            } else {
                                // Skip errors until valid id is found or till the list reached end.
                                Observable.empty<String>()
                            }
                        }
                }.firstOrError()
}

// Parallel Flow, Need help here to identify the list is completed and return NoStatusFoundException
fun getStatus(ids: List<String>): Single<String> { 
              Flowable.fromIterable(ids)
                .parallel()
                .runOn(io())
                .flatMap{ id -> getStatus(id).toFlowable()
                          .onErrorResumeWith { Flowable.empty<String>() }
                        }
                .sequentialDelayError()
                .firstOrError()
}

android rx-java2 flowable rx-java3
1个回答
0
投票

您返回的Flowable.empty()立即完成订阅。取自文档:

返回不向{@link Subscriber}发送任何项目并立即调用其{@link Subscriber#onComplete onComplete}方法的Flowable。

也许您可以返回Flowable.just("")或在出现错误的情况下提供一些预期的参数。

© www.soinside.com 2019 - 2024. All rights reserved.