Project Reactor:retryWhen() 的超时未按预期工作

问题描述 投票:0回答:0

下面的代码使用 project reactor 来模拟一个简单的交易,我们首先使用

newTrx()
来启动一个 trx,然后调用
getTrxResult()
来获取 trx 的结果。

请注意,我们将重试

doGet()
几次以等待trx完成。而我们使用
retryWhen()
来实现重试。但是,我们发现超时没有按预期工作。根据我们的观察,似乎超时是在
newTrx
Mono启动而不是
doGet()
时计算的。

如何让timeout()只申请

doGet
Mono的重试?

    static Mono<Boolean> newTrx(String trx) {
        var launchTrx = Mono.fromCallable(() -> {
            System.out.println("launch a new trx " + trx);
            try {  // simulate hard work
                Thread.sleep(800);
            } catch (Exception ignored) {}
            return "trx_" + trx;
        });
        return launchTrx
                .flatMap(t -> getTrxResult(t));
    }

    static Mono<Boolean> getTrxResult(String trxId) {
        var count = new AtomicInteger(0);
        var doGet = Mono.fromCallable(() -> {
            try {  // simulate the real work
                Thread.sleep(200);
            } catch (Exception ignored) {}
            if (count.getAndIncrement() < 3) {
                throw new RuntimeException("trx still in progress");
            }
            System.out.println(trxId + " completed");
            return true;
        });

        return doGet
                .retryWhen(Retry.fixedDelay(10, Duration.ofMillis(200))
                        .filter(ex -> ex instanceof RuntimeException)
                )
                .timeout(Duration.ofSeconds(1));   // <-----------
    }

执行

newTrx("t1").block()
时,我们会得到如下错误

reactor.core.Exceptions$ReactiveException: java.util.concurrent.TimeoutException: Did not observe any item or terminal signal within 1000ms in 'retryWhen' (并且没有配置回退)

project-reactor
© www.soinside.com 2019 - 2024. All rights reserved.