所以我尝试使用 onErrorReturn
返回我想要的结果,但它将完成后的流,我如何抓住错误返回为下一个,并仍然继续流?
用下面的代码,它不会达到 retryWhen
当有错误,如果我翻转它,它不会重新订阅与 retryWhen
如有错误
fun process(): Observable<State> {
return publishSubject
.flatMap { intent ->
actAsRepo(intent) // Might return error
.map { State(data = it, error = null) }
}
.onErrorReturn { State(data = "", error = it) } // catch the error
.retryWhen { errorObs ->
errorObs.flatMap {
Observable.just(State.defaultState()) // continue subscribing
}
}
}
private fun actAsRepo(string: String): Observable<String> {
if (string.contains('A')) {
throw IllegalArgumentException("Contains A")
} else {
return Observable.just("Wrapped from repo: $string")
}
}
订户将
viewModel.process().subscribe(this::render)
onError是一个终端操作符。如果发生了onError,它将在操作符之间传递。你可以使用一个onError操作符,它可以捕获onError并提供一个回退。
在你的例子中,onError发生在flatMap的内部流中。onError将被传播到下游的onErrorReturn操作器中。如果你看一下实现,你会看到onErrorReturn lambda将被调用,结果将在onComplete之后的onNext被推送到下游。
@Override
public void onError(Throwable t) {
T v;
try {
v = valueSupplier.apply(t);
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
downstream.onError(new CompositeException(t, e));
return;
}
if (v == null) {
NullPointerException e = new NullPointerException("The supplied value is null");
e.initCause(t);
downstream.onError(e);
return;
}
downstream.onNext(v); // <--------
downstream.onComplete(); // <--------
}
你的流完成的原因是 #retryWhen JavaDoc
如果对操作者的上游是异步的,立即在onComplete之后发出onNext的信号,可能导致序列立即完成。同样,如果这个内部的{@code ObservableSource}在上游处于活动状态时发出{@code onError}或{@code onComplete}信号,序列就会立即用同样的信号终止。
把onErrorReturn放在flatMap的map opreator后面 使用这种排序方式,当内侧的flatMap流onErrors时,你的流将不会完成。
flatMap操作者完成,当外层(源:publishSubject)和内层流(subscription)都完成的时候。在这种情况下,外层流(publishSubject)发出onNext,而内层流将在通过onNext发送{ State(data = "", error = it) }后完成。因此流将保持开放状态。
interface ApiCall {
fun call(s: String): Observable<String>
}
class ApiCallImpl : ApiCall {
override fun call(s: String): Observable<String> {
// important: warp call into observable, that the exception is caught and emitted as onError downstream
return Observable.fromCallable {
if (s.contains('A')) {
throw IllegalArgumentException("Contains A")
} else {
s
}
}
}
}
data class State(val data: String, val err: Throwable? = null)
apiCallImpl.call将返回一个懒惰的observable,它将在订阅时抛出一个错误,而不是在observable装配时。
// no need for retryWhen here, except you want to catch onComplete from the publishSubject, but once the publishSubject completes no re-subscription will help you, because the publish-subject is terminated and onNext invocations will not be accepted anymore (see implementation).
fun process(): Observable<State> {
return publishSubject
.flatMap { intent ->
apiCallImpl.call(intent) // Might return error
.map { State(data = it, err = null) }
.onErrorReturn { State("", err = it) }
}
}
lateinit var publishSubject: PublishSubject<String>
lateinit var apiCallImpl: ApiCallImpl
@Before
fun init() {
publishSubject = PublishSubject.create()
apiCallImpl = ApiCallImpl()
}
@Test
fun myTest() {
val test = process().test()
publishSubject.onNext("test")
publishSubject.onNext("A")
publishSubject.onNext("test2")
test.assertNotComplete()
.assertNoErrors()
.assertValueCount(3)
.assertValueAt(0) {
assertThat(it).isEqualTo(State("test", null))
true
}
.assertValueAt(1) {
assertThat(it.data).isEmpty()
assertThat(it.err).isExactlyInstanceOf(IllegalArgumentException::class.java)
true
}
.assertValueAt(2) {
assertThat(it).isEqualTo(State("test2", null))
true
}
}
这个替代方案与第一个解决方案有一点不同。FlatMap-Operator采用了一个布尔值(delayError),这将导致吞噬onError消息,直到源完成。当源完成时,错误将被发出。
您可以使用 delayError true,当异常没有任何用处,并且必须在出现时不被记录下来时
fun process(): Observable<State> {
return publishSubject
.flatMap({ intent ->
apiCallImpl.call(intent)
.map { State(data = it, err = null) }
}, true)
}
只发出两个值。错误不会被转换为回退值。
@Test
fun myTest() {
val test = process().test()
publishSubject.onNext("test")
publishSubject.onNext("A")
publishSubject.onNext("test2")
test.assertNotComplete()
.assertNoErrors()
.assertValueAt(0) {
assertThat(it).isEqualTo(State("test", null))
true
}
.assertValueAt(1) {
assertThat(it).isEqualTo(State("test2", null))
true
}
.assertValueCount(2)
}
注意:我想在这种情况下你应该使用switchMap,而不是flatMap。