我有一个API调用,可根据“ Id”验证某些状态。 API返回单一或错误。我有此类ID的列表,只有一个ID有效返回成功,否则不返回(所有ID的返回错误)。我需要的是,遍历每个Id并跳过API调用中的错误,直到列表成功或结束。我能够按顺序实现这一目标。但是,我正在尝试使用ParallelFlowable做同样的事情。当一个ID返回成功时,它可以正常工作;但是,当没有一个返回成功的ID(所有ID都失败)时,它只是跳过API中的所有错误,但在所有ID都经过验证后不通知订户。我不确定如何处理。
// API call
fun getStatus(Id: String): Single<String> {
//... returns Single<String> or error
}
//Sequential flow, Working
fun getStatus(ids: List<String>): Single<String> {
Observable.fromIterable(ids)
.flatMapSingle { id ->
getStatus(id)
.onErrorResumeWith { singleSource ->
if (ids.last() == id)) { //If this is last item in list, return error
singleSource.onError(NoStatusFoundException())
} else {
// Skip errors until valid id is found or till the list reached end.
Observable.empty<String>()
}
}
}.firstOrError()
}
// Parallel Flow, Need help here to identify the list is completed and return NoStatusFoundException
fun getStatus(ids: List<String>): Single<String> {
Flowable.fromIterable(ids)
.parallel()
.runOn(io())
.flatMap{ id -> getStatus(id).toFlowable()
.onErrorResumeWith { Flowable.empty<String>() }
}
.sequentialDelayError()
.firstOrError()
}
您返回的Flowable.empty()立即完成订阅。取自文档:
返回不向{@link Subscriber}发送任何项目并立即调用其{@link Subscriber#onComplete onComplete}方法的Flowable。
也许您可以返回Flowable.just("")
或在出现错误的情况下提供一些预期的参数。