如何定时执行任务并响应外部信号?

问题描述 投票:0回答:2

想象一个重复执行的任务,每次执行之间最多有 10 秒的延迟。还有一个外部信号会导致任务立即执行。 (外部信号后,计时器是否重置回10秒并不重要。)

我尝试过两种建模方式,但都不理想:

  1. 使用

    Flux.interval(Duration.ofSeconds(10))
    进行基于计时器的执行。使用由
    Sinks.many()
    创建的热通量,该热通量被馈送到外部信号。合并两个通量,并在
    concatMap
    运算符中对合并结果执行任务。

    这种方法的问题在于信号可能会在执行之前“堆积”。添加

    onBackpressureLatest()
    可以缓解堆耗尽,但如果任务的一次执行花费的时间超过延迟间隔,则不会阻止多个信号排队。

  2. 添加

    delay()
    ,然后添加
    repeat()
    。这里的挑战是,当热外部触发通量发出一个值时,我无法弄清楚如何让延迟可靠地提前结束。
    Mono.firstWithValue
    几乎是我想要的,但它每次都会订阅外部触发通量,这意味着可能会错过信号。

如何使用 Reactor 实现这种任务调度?或者它只是不适合这项工作的工具?

project-reactor
2个回答
1
投票

考虑使用此代码片段。我简化了你的情况,但应该有利于进一步改进。

private static final Integer TIMEOUT = 10;

@Test
void shouldReactOnTriggerOrProvideDefaultValue() {
    trigger()
        .flatMap(trigger -> task())
        .timeout(Duration.ofSeconds(TIMEOUT), task())
        .log()
        .as(StepVerifier::create)
        .expectNextCount(1)
        .verifyComplete();
}

private Mono<String> trigger() {
    return Mono.just("trigger")
        .delayElement(Duration.ofSeconds(new Random().nextInt(TIMEOUT)));
}

private Mono<String> task() {
    return Mono.just("task");
}

1
投票

您应该查看

Flux.windowTimeout(int, Duration)
使用
windowTimeout(1, Duration.ofSeconds(10))
时,生成的外部 Flux 将发出内部 Flux,每当收到一个元素或经过 10 秒时(以先到者为准),内部 Flux 就会结束。然后您可以将您的任务附加到窗口的末尾。

示例:

Flux<String> externalSignalFlux;

public Mono<String> task() {
    return Mono.just("Task completed");
}

public Flux<String> windowTimeoutDemo() {
    return externalSignalFlux
            .windowTimeout(1, Duration.ofSeconds(10))
            .concatMap(window -> window.then(task()));
}

如果需要处理信号的值:

Flux<String> externalSignalFlux;

public Mono<String> task(String signal) {
    return Mono.just("Task completed: " + signal);
}

public Flux<String> windowTimeoutDemo() {
    return externalSignalFlux
            .windowTimeout(1, Duration.ofSeconds(10))
            .concatMap(window -> window.defaultIfEmpty("repeatedTask"))
            .concatMap(this::task);
}

注意:虽然

Flux.bufferTimeout(int, Duration)
看起来非常相似,但每个窗口的超时仅在接收到第一个元素时才开始,从而导致生成的 Flux 中存在间隙。

© www.soinside.com 2019 - 2024. All rights reserved.