我想用我的代码做什么
我想构建一个无限的
Flux
来检查条件(功能标志),然后根据条件是否为真继续处理请求。这可能导致两条路径:
repeat
Flux
repeat
Flux
和exponentialBackoff
持续一段时间retry
Flux
为了满足上述要求,我有以下代码。
public Flux<String> getData(String request) {
return Flux.just(request)
.repeatWhen(Repeat.onlyIf(context -> isEnabled()))
.repeatWhen(Repeat.onlyIf(context -> !isEnabled())
.exponentialBackoff(Duration.ofSeconds(1L), Duration.ofSeconds(5L)))
.flatMap(this::process)
.retry();
}
测试问题
现在,当我试图通过模拟
isEnabled
函数返回一个假值来测试它时,我期望等待一段时间,然后期望没有元素从通量中返回。
我已尝试通过以下 4 种方式测试上述代码:
没有使用 thenAwait() 的虚拟时间
Flux<String> response = getData(sampleRequest);
StepVerifier.create(response)
.thenAwait(Duration.ofSeconds(1L))
.expectNextCount(0L)
.thenCancel()
.verify();
没有使用 expectNoEvent() 的虚拟时间
Flux<String> response = getData(sampleRequest);
StepVerifier.create(response)
.expectNoEvent(Duration.ofSeconds(1L))
.thenCancel()
.verify();
虚拟时间使用 thenAwait()
Flux<String> response = getData(sampleRequest);
StepVerifier.withVirtualTime(() -> response)
.expectSubscription()
.thenAwait(Duration.ofSeconds(1L))
.expectNextCount(0L)
.thenCancel()
.verify();
使用 expectNoEvent() 的虚拟时间
Flux<String> response = getData(sampleRequest);
StepVerifier.withVirtualTime(() -> response)
.expectSubscription()
.expectNoEvent(Duration.ofSeconds(1L))
.thenCancel()
.verify();
所有上述测试最终都会无限期地运行而不会终止。我想了解在
isEnabled
为假的情况下我错过了什么。我知道一旦遇到错误的条件值,重新订阅就会在后台发生,但是为什么StepVerifier
不取消正在进行的Flux
.
如果有人能指出我正确的方向,那将非常有帮助。
重复,默认配置可以是无限次尝试(Long.MAX_VALUE)和全局超时(持续时间)。
使用 repeatMax(3) 最多重复 3 次
.repeatWhen(Repeat.onlyIf(context -> !isEnabled())
.exponentialBackoff(Duration.ofSeconds(1L),Duration.ofSeconds(9L)).repeatMax(3))