Project reactor:flatMap之后的onErrorResume

问题描述 投票:0回答:2
Flux.just("a", "b")
        .flatMap(s -> s.equals("a") ? Mono.error(new RuntimeException() : Flux.just(s + "1", s + "2"))
        .onErrorResume(throwable -> Mono.empty())
        .subscribe(System.out::println);

你好!

在这里,我制作了两个元素的流量,然后通过flatMap首先将其暴露给异常,将第二个暴露给另一个Flux。

随着onErrorResume我期待输出

b1
b2

但一无所获。有谁可以解释为什么会发生,拜托?

谢谢。

java project-reactor reactor reactive-streams
2个回答
4
投票

鉴于这种:

Flux.just("a", "b", "c")
        .flatMap { s ->
            if (s == "b") 
                Mono.error<RuntimeException>(RuntimeException()) 
            else 
                Flux.just(s + "1", s + "2")
        }.onErrorResume { throwable -> Mono.just("d") }.log()
        .subscribe { println(it) }

输出是:

12:35:19.673 [main] INFO reactor.Flux.OnErrorResume.1 - onSubscribe(FluxOnErrorResume.ResumeSubscriber)
12:35:19.676 [main] INFO reactor.Flux.OnErrorResume.1 - request(unbounded)
12:35:19.677 [main] INFO reactor.Flux.OnErrorResume.1 - onNext(a1)
a1
12:35:19.677 [main] INFO reactor.Flux.OnErrorResume.1 - onNext(a2)
a2
12:35:19.712 [main] INFO reactor.Flux.OnErrorResume.1 - onNext(d)
d
12:35:19.713 [main] INFO reactor.Flux.OnErrorResume.1 - onComplete()

这里发生了什么? onErrorResume()正在应用于flatMap()运营商返回的发布者。由于在“b”发布者发出故障信号,flatMap()发布者不再执行,onErrorResume()运营商继续使用其后备发布。

documentation for onErrorResume()清楚地表明原始发布者因错误和后备接管而结束:

enter image description here


1
投票

这个问题已经由codependent得到了一个可靠的答案,为什么会发生这种情况。回答一些偏离主题如何实现预期的输出:

onErrorResume调用必须移动到flatMap

Flux.just("a", "b")
    .flatMap(s ->
        (s.equals("a") ? Mono.error<RuntimeException>(RuntimeException()) : Flux.just(s + "1", s + "2"))
             .onErrorResume(ex -> Mono.empty())
    )
    .subscribe(System.out::println)

这样输出就像预期的那样

b1
b2
© www.soinside.com 2019 - 2024. All rights reserved.