Flux with repeatWhen 使用 onCancel 永远不会终止

问题描述 投票:0回答:1

我想用我的代码做什么

我想构建一个无限的

Flux
来检查条件(功能标志),然后根据条件是否为真继续处理请求。这可能导致两条路径:

  • 条件为真。继续处理然后
    repeat
    Flux
  • 条件为假。不做任何处理,但
    repeat
    Flux
    exponentialBackoff
    持续一段时间
  • 如果下游处理抛出错误,那么
    retry
    Flux

为了满足上述要求,我有以下代码。

public Flux<String> getData(String request) {
    return Flux.just(request)
               .repeatWhen(Repeat.onlyIf(context -> isEnabled()))
               .repeatWhen(Repeat.onlyIf(context -> !isEnabled())
                                 .exponentialBackoff(Duration.ofSeconds(1L), Duration.ofSeconds(5L)))
               .flatMap(this::process)
               .retry();
               
}

测试问题

现在,当我试图通过模拟

isEnabled
函数返回一个假值来测试它时,我期望等待一段时间,然后期望没有元素从通量中返回。

我已尝试通过以下 4 种方式测试上述代码:

没有使用 thenAwait() 的虚拟时间

Flux<String> response = getData(sampleRequest);

StepVerifier.create(response)
         .thenAwait(Duration.ofSeconds(1L))
         .expectNextCount(0L)
         .thenCancel()
         .verify();

没有使用 expectNoEvent() 的虚拟时间

Flux<String> response = getData(sampleRequest);

StepVerifier.create(response)
         .expectNoEvent(Duration.ofSeconds(1L))
         .thenCancel()
         .verify();

虚拟时间使用 thenAwait()

Flux<String> response = getData(sampleRequest);

StepVerifier.withVirtualTime(() -> response)
         .expectSubscription()
         .thenAwait(Duration.ofSeconds(1L))
         .expectNextCount(0L)
         .thenCancel()
         .verify();

使用 expectNoEvent() 的虚拟时间

Flux<String> response = getData(sampleRequest);

StepVerifier.withVirtualTime(() -> response)
         .expectSubscription()
         .expectNoEvent(Duration.ofSeconds(1L))
         .thenCancel()
         .verify();

所有上述测试最终都会无限期地运行而不会终止。我想了解在

isEnabled
为假的情况下我错过了什么。我知道一旦遇到错误的条件值,重新订阅就会在后台发生,但是为什么
StepVerifier
不取消正在进行的
Flux
.

如果有人能指出我正确的方向,那将非常有帮助。

java reactive-programming project-reactor
1个回答
0
投票

重复,默认配置可以是无限次尝试(Long.MAX_VALUE)和全局超时(持续时间)。

使用 repeatMax(3) 最多重复 3 次

 .repeatWhen(Repeat.onlyIf(context -> !isEnabled())
  .exponentialBackoff(Duration.ofSeconds(1L),Duration.ofSeconds(9L)).repeatMax(3))
© www.soinside.com 2019 - 2024. All rights reserved.