下面的代码使用 project reactor 来模拟一个简单的交易,我们首先使用
newTrx()
来启动一个 trx,然后调用 getTrxResult()
来获取 trx 的结果。
请注意,我们将重试
doGet()
几次以等待trx完成。而我们使用retryWhen()
来实现重试。但是,我们发现超时没有按预期工作。根据我们的观察,似乎超时是在newTrx
Mono启动而不是doGet()
时计算的。
如何让timeout()只申请
doGet
Mono的重试?
static Mono<Boolean> newTrx(String trx) {
var launchTrx = Mono.fromCallable(() -> {
System.out.println("launch a new trx " + trx);
try { // simulate hard work
Thread.sleep(800);
} catch (Exception ignored) {}
return "trx_" + trx;
});
return launchTrx
.flatMap(t -> getTrxResult(t));
}
static Mono<Boolean> getTrxResult(String trxId) {
var count = new AtomicInteger(0);
var doGet = Mono.fromCallable(() -> {
try { // simulate the real work
Thread.sleep(200);
} catch (Exception ignored) {}
if (count.getAndIncrement() < 3) {
throw new RuntimeException("trx still in progress");
}
System.out.println(trxId + " completed");
return true;
});
return doGet
.retryWhen(Retry.fixedDelay(10, Duration.ofMillis(200))
.filter(ex -> ex instanceof RuntimeException)
)
.timeout(Duration.ofSeconds(1)); // <-----------
}
执行
newTrx("t1").block()
时,我们会得到如下错误
reactor.core.Exceptions$ReactiveException: java.util.concurrent.TimeoutException: Did not observe any item or terminal signal within 1000ms in 'retryWhen' (并且没有配置回退)