Flux.just("a", "b")
.flatMap(s -> s.equals("a") ? Mono.error(new RuntimeException() : Flux.just(s + "1", s + "2"))
.onErrorResume(throwable -> Mono.empty())
.subscribe(System.out::println);
你好!
在这里,我制作了两个元素的流量,然后通过flatMap首先将其暴露给异常,将第二个暴露给另一个Flux。
随着onErrorResume
我期待输出
b1
b2
但一无所获。有谁可以解释为什么会发生,拜托?
谢谢。
鉴于这种:
Flux.just("a", "b", "c")
.flatMap { s ->
if (s == "b")
Mono.error<RuntimeException>(RuntimeException())
else
Flux.just(s + "1", s + "2")
}.onErrorResume { throwable -> Mono.just("d") }.log()
.subscribe { println(it) }
输出是:
12:35:19.673 [main] INFO reactor.Flux.OnErrorResume.1 - onSubscribe(FluxOnErrorResume.ResumeSubscriber)
12:35:19.676 [main] INFO reactor.Flux.OnErrorResume.1 - request(unbounded)
12:35:19.677 [main] INFO reactor.Flux.OnErrorResume.1 - onNext(a1)
a1
12:35:19.677 [main] INFO reactor.Flux.OnErrorResume.1 - onNext(a2)
a2
12:35:19.712 [main] INFO reactor.Flux.OnErrorResume.1 - onNext(d)
d
12:35:19.713 [main] INFO reactor.Flux.OnErrorResume.1 - onComplete()
这里发生了什么? onErrorResume()
正在应用于flatMap()
运营商返回的发布者。由于在“b”发布者发出故障信号,flatMap()
发布者不再执行,onErrorResume()
运营商继续使用其后备发布。
documentation for onErrorResume()清楚地表明原始发布者因错误和后备接管而结束:
这个问题已经由codependent
得到了一个可靠的答案,为什么会发生这种情况。回答一些偏离主题如何实现预期的输出:
onErrorResume
调用必须移动到flatMap
:
Flux.just("a", "b")
.flatMap(s ->
(s.equals("a") ? Mono.error<RuntimeException>(RuntimeException()) : Flux.just(s + "1", s + "2"))
.onErrorResume(ex -> Mono.empty())
)
.subscribe(System.out::println)
这样输出就像预期的那样
b1
b2