在 spring webflux 中,如何更改重试的上下文?

问题描述 投票:0回答:1

我配置了一个反应器链,我想在失败时重试 - 我可以这样做,但是我无法更改上下文,以便该链知道它何时处于重试状态。

在下面的示例中,我希望在重试期间“更改”FLAG 值(在 deferContextual 中)。

const val FLAG = "Flag"

fun main() {

    val tr = TestRetry()
    tr.main()
}

class TestRetry {

    fun main() {
        Mono.deferContextual { ctx ->
            val flag = ctx.get<Any>(FLAG)
            println("flag ${flag}")
            Mono.just("hello")
        }
        .subscribeOn(Schedulers.boundedElastic())
        .map {
            println("map: $it")
            if (true) {
                throw IllegalArgumentException()
            }
            "there"
        }
        .retryWhen(retrySpec())
        .contextWrite {
            println("CONTEXT WRITE initialising FLAG")
            it.put(FLAG, "original")
        }
        .subscribe()

        Thread.sleep(15000);
    }

    fun retrySpec(): RetrySpec =
        Retry.max(2L)
            .doBeforeRetry {
                val ctx = it.retryContextView()
                val flag = ctx.getOrDefault(FLAG, "oops") // NOTE: here FLAG is altered
                println ("DO BEFORE RETRY Flag ${flag}")
            }
            .filter { throwable ->
                throwable is IllegalArgumentException
            }
            .onRetryExhaustedThrow { _, retrySignal ->
                println("retries exhausted")
                throw RuntimeException("done")
            }.withRetryContext(
                Context.of(FLAG, "altered")
            )
}

运行上面的示例时,我尝试更改

withRetryContext
中 FLAG 变量的值,我可以看到
doBeforeRetry
中的值已更改,但是当我们返回
Mono.deferContextual
时,该值又回来了到“原来”。我认为我没有正确理解反应器代码的工作原理。

kotlin spring-webflux retrywhen
1个回答
0
投票

retryWhen的javadoc中有一些如何修改上下文的示例代码:

Retry customStrategy = Retry.from(companion -> companion.handle((retrySignal, sink) -> {
  Context ctx = sink.currentContext();
  int rl = ctx.getOrDefault("retriesLeft", 0);
  
  if (rl > 0) {
    sink.next(Context.of(
      "retriesLeft", rl - 1,
      "lastError", retrySignal.failure()
    ));
  } else {
    sink.error(Exceptions.retryExhausted("retries exhausted", retrySignal.failure()));
  }
}));

Mono<T> retried = originalMono.retryWhen(customStrategy);
© www.soinside.com 2019 - 2024. All rights reserved.