我配置了一个反应器链,我想在失败时重试 - 我可以这样做,但是我无法更改上下文,以便该链知道它何时处于重试状态。
在下面的示例中,我希望在重试期间“更改”FLAG 值(在 deferContextual 中)。
const val FLAG = "Flag"
fun main() {
val tr = TestRetry()
tr.main()
}
class TestRetry {
fun main() {
Mono.deferContextual { ctx ->
val flag = ctx.get<Any>(FLAG)
println("flag ${flag}")
Mono.just("hello")
}
.subscribeOn(Schedulers.boundedElastic())
.map {
println("map: $it")
if (true) {
throw IllegalArgumentException()
}
"there"
}
.retryWhen(retrySpec())
.contextWrite {
println("CONTEXT WRITE initialising FLAG")
it.put(FLAG, "original")
}
.subscribe()
Thread.sleep(15000);
}
fun retrySpec(): RetrySpec =
Retry.max(2L)
.doBeforeRetry {
val ctx = it.retryContextView()
val flag = ctx.getOrDefault(FLAG, "oops") // NOTE: here FLAG is altered
println ("DO BEFORE RETRY Flag ${flag}")
}
.filter { throwable ->
throwable is IllegalArgumentException
}
.onRetryExhaustedThrow { _, retrySignal ->
println("retries exhausted")
throw RuntimeException("done")
}.withRetryContext(
Context.of(FLAG, "altered")
)
}
运行上面的示例时,我尝试更改
withRetryContext
中 FLAG 变量的值,我可以看到 doBeforeRetry
中的值已更改,但是当我们返回 Mono.deferContextual
时,该值又回来了到“原来”。我认为我没有正确理解反应器代码的工作原理。
在retryWhen的javadoc中有一些如何修改上下文的示例代码:
Retry customStrategy = Retry.from(companion -> companion.handle((retrySignal, sink) -> {
Context ctx = sink.currentContext();
int rl = ctx.getOrDefault("retriesLeft", 0);
if (rl > 0) {
sink.next(Context.of(
"retriesLeft", rl - 1,
"lastError", retrySignal.failure()
));
} else {
sink.error(Exceptions.retryExhausted("retries exhausted", retrySignal.failure()));
}
}));
Mono<T> retried = originalMono.retryWhen(customStrategy);