想象一个重复执行的任务,每次执行之间最多有 10 秒的延迟。还有一个外部信号会导致任务立即执行。
(外部信号后,计时器是否重置回10秒并不重要)。
我尝试过两种建模方式,但都不理想:
A) 使用
Flux.interval(Duration.ofSeconds(10))
进行基于计时器的执行。使用由 Sinks.many()
创建的热通量,该热通量被馈送到外部信号。合并两个通量,并在 concatMap
运算符中对合并结果执行任务。
这种方法的问题在于信号可能会在执行之前“堆积”。添加
onBackpressureLatest()
可以缓解堆耗尽,但如果任务的一次执行花费的时间超过延迟间隔,则不会阻止多个信号排队。
B) 添加
delay()
,然后添加 repeat()
。这里的挑战是,当热外部触发通量发出一个值时,我无法弄清楚如何让延迟可靠地提前结束。 Mono.firstWithValue
几乎是我想要的,但它每次都会订阅外部触发通量,这意味着可能会错过信号。
如何使用 Reactor 实现这种任务调度?或者它只是不适合这项工作的工具?
考虑使用此代码片段。我简化了你的情况,但应该有利于进一步改进。
private static final Integer TIMEOUT = 10;
@Test
void shouldReactOnTriggerOrProvideDefaultValue() {
trigger()
.flatMap(trigger -> task())
.timeout(Duration.ofSeconds(TIMEOUT), task())
.log()
.as(StepVerifier::create)
.expectNextCount(1)
.verifyComplete();
}
private Mono<String> trigger() {
return Mono.just("trigger")
.delayElement(Duration.ofSeconds(new Random().nextInt(TIMEOUT)));
}
private Mono<String> task() {
return Mono.just("task");
}
Flux.windowTimeout(int, Duration)
使用 windowTimeout(1, Duration.ofSeconds(10))
时,生成的外部 Flux 将发出内部 Flux,每当收到一个元素或经过 10 秒时(以先到者为准),内部 Flux 就会结束。然后您可以将您的任务附加到窗口的末尾。
示例:
Flux<String> externalSignalFlux;
public Mono<String> task() {
return Mono.just("Task completed");
}
public Flux<String> windowTimeoutDemo() {
return externalSignalFlux
.windowTimeout(1, Duration.ofSeconds(10))
.concatMap(window -> window.then(task()));
}
如果需要处理信号的值:
Flux<String> externalSignalFlux;
public Mono<String> task(String signal) {
return Mono.just("Task completed: " + signal);
}
public Flux<String> windowTimeoutDemo() {
return externalSignalFlux
.windowTimeout(1, Duration.ofSeconds(10))
.concatMap(window -> window.defaultIfEmpty("repeatedTask"))
.concatMap(this::task);
}
Flux.bufferTimeout(int, Duration)
看起来非常相似,但每个窗口的超时仅在接收到第一个元素时才开始,从而导致生成的 Flux 中存在间隙。