如何监听reactor netty触发doOnError的原因?

问题描述 投票:0回答:2

定位问题,发现是webflux的Mono触发了doOnCancel。好像是浏览器链接中断导致的,导致执行了doOnCancel。现在我想知道如何避免我的 Mono 由于上游问题而触发 doOnCancel 。

以下是我的测试代码。当浏览器页面请求该接口并立即刷新时,会触发doOnCancel的执行。但是,我希望避免执行 doOnCancel。

@GetMapping("/api/testOnCancel")
    public Mono<String> testOnCancel() {
        return Mono.just("1").
                flatMap(str -> Mono.just("test" + str))
                .delayElement(Duration.ofMinutes(2))
                .doOnCancel(() -> System.out.println("on cancel"))
                .doOnSuccess(str -> System.out.println("success"))
                .doOnError(System.out::println)
                .flatMap(str -> Mono.just("byby" + str));
    }
spring-boot spring-webflux project-reactor reactor-netty
2个回答
1
投票

我已经解决了这个问题,为了帮助和我遇到同样问题的朋友,我在这里回答一下。

只需在最后添加一个缓存操作符调用即可。

@GetMapping("/api/testOnCancel")
    public Mono<String> testOnCancel() {
        return Mono.just("1").
                flatMap(str -> Mono.just("test" + str))
                .delayElement(Duration.ofMinutes(2))
                .doOnCancel(() -> System.out.println("on cancel"))
                .doOnSuccess(str -> System.out.println("success"))
                .doOnError(System.out::println)
                .flatMap(str -> Mono.just("byby" + str))
                .cache();
    }

0
投票

正如您已经说过的,没有失败的原因。这就是为什么你在

doOnError
中看不到原因。

您已经发现您的单声道被取消了(可能是因为您添加的延迟)。您可以在 baeldung 上的这篇文章中找到一些取消的原因,例如背压、资源不足等

记住这些信息,您的代码就可以了。它通过打印一些消息来处理取消,如果没有取消但出现错误,您也可以打印它,这样您就可以看到原因。

现在,关于取消事件,您需要了解为什么会发生这种情况。在某些时候,您触发低于 2' 的阈值,这会取消此单声道执行(默认情况下应该没有问题)。为了说明这一点,假设我们有一个单元测试:

@Test void something() { Mono<String> test = Mono.just("1"). flatMap(str -> Mono.just("test" + str)) .delayElement(Duration.ofMinutes(2)) .doOnCancel(() -> System.out.println("on cancel")) .doOnSuccess(str -> System.out.println("success")) .doOnError(System.out::println) .flatMap(str -> Mono.just("byby" + str)); // uncommenting the next line should print 'bybytest1' without errors, after 2m // System.out.println(test.block()); // uncommenting next line should print 'on cancel' and a timeout error, after 30s: // java.lang.IllegalStateException: Timeout on blocking read for 30000000000 NANOSECONDS // System.out.println(test.block(Duration.ofSeconds(30))); }
    
© www.soinside.com 2019 - 2024. All rights reserved.