Reactive Java:在运行时更改窗口大小

问题描述 投票:0回答:1

我每 1 秒就有一个 Flux 发射物品,重复。如何动态改变窗口大小?

public static void main(String[] args) throws InterruptedException {
    Duration interval = Duration.ofSeconds(1);

    Flux<String> flux = Flux.just("A", "B", "C", "D", "E", "F", "G", "H", "I", "J");
    flux
            .repeat()
            // on each emit from the publisher,
            // a batch of a dynamic size is read
            // could be just 1 item, 3 items, or even more
            .window(calcWindowSize()))
            .concatMap(Flux::collectList)
            // since we only emit every 1 second
            // the window size changes over time
            .delayElements(interval)
            .doOnNext(batch -> {
                log.info("Processed batch: " + batch + " at " + System.currentTimeMillis());
            })
            .subscribe();

    Thread.sleep(10_000);
}

private static int calcWindowSize() {
    // this calculation is a little more complex in reality
    // so let's just use some random int here
    return RandomUtils.nextInt(1, 10);
}

上面的代码不起作用,但希望说明我想要实现的目标。我尝试使用

windowWhen
但不理解它的语法。如果我无法动态更改窗口大小,我可以重新创建 Flux 并从前一秒离开的位置继续吗?

java spring-webflux project-reactor reactive
1个回答
0
投票

一般来说,当您的窗口边界取决于项目(例如数量等)时,那么

windowUntil
Predicate<T>
将是最简单的选择。

它一一获取所有元素,并在窗口应该关闭时返回

true
。您可以在谓词内运行计数器或其他逻辑。

对于您的合成示例,类似这样的内容应该有效:

public static void main(String[] args) throws InterruptedException {
    Duration interval = Duration.ofSeconds(1);

    Flux<String> flux = Flux.just("A", "B", "C", "D", "E", "F", "G", "H", "I", "J");
    flux
            .repeat()
            // on each emit from the publisher,
            // a batch of a dynamic size is read
            // could be just 1 item, 3 items, or even more
            .windowUntil(boundaries())
            .concatMap(Flux::collectList)
            // since we only emit every 1 second
            // the window size changes over time
            .delayElements(interval)
            .doOnNext(batch -> {
                log.info("Processed batch: " + batch + " at " + System.currentTimeMillis());
            })
            .subscribe();

    Thread.sleep(10_000); 
}

private static Predicate<String> boundaries() {
    AtomicInteger counter = new AtomicInteger(calcWindowSize());
    return {
       boolean done = counter.decrementAndGet() <= 0;
       if (done) {
          counter.set(calcWindowSize());
       }
       done
    }
}

private static int calcWindowSize() {
    // this calculation is a little more complex in reality
    // so let's just use some random int here
    return RandomUtils.nextInt(1, 10); 
}
© www.soinside.com 2019 - 2024. All rights reserved.