我每 1 秒就有一个 Flux 发射物品,重复。如何动态改变窗口大小?
public static void main(String[] args) throws InterruptedException {
Duration interval = Duration.ofSeconds(1);
Flux<String> flux = Flux.just("A", "B", "C", "D", "E", "F", "G", "H", "I", "J");
flux
.repeat()
// on each emit from the publisher,
// a batch of a dynamic size is read
// could be just 1 item, 3 items, or even more
.window(calcWindowSize()))
.concatMap(Flux::collectList)
// since we only emit every 1 second
// the window size changes over time
.delayElements(interval)
.doOnNext(batch -> {
log.info("Processed batch: " + batch + " at " + System.currentTimeMillis());
})
.subscribe();
Thread.sleep(10_000);
}
private static int calcWindowSize() {
// this calculation is a little more complex in reality
// so let's just use some random int here
return RandomUtils.nextInt(1, 10);
}
上面的代码不起作用,但希望说明我想要实现的目标。我尝试使用
windowWhen
但不理解它的语法。如果我无法动态更改窗口大小,我可以重新创建 Flux 并从前一秒离开的位置继续吗?
一般来说,当您的窗口边界取决于项目(例如数量等)时,那么
windowUntil
和 Predicate<T>
将是最简单的选择。
它一一获取所有元素,并在窗口应该关闭时返回
true
。您可以在谓词内运行计数器或其他逻辑。
对于您的合成示例,类似这样的内容应该有效:
public static void main(String[] args) throws InterruptedException {
Duration interval = Duration.ofSeconds(1);
Flux<String> flux = Flux.just("A", "B", "C", "D", "E", "F", "G", "H", "I", "J");
flux
.repeat()
// on each emit from the publisher,
// a batch of a dynamic size is read
// could be just 1 item, 3 items, or even more
.windowUntil(boundaries())
.concatMap(Flux::collectList)
// since we only emit every 1 second
// the window size changes over time
.delayElements(interval)
.doOnNext(batch -> {
log.info("Processed batch: " + batch + " at " + System.currentTimeMillis());
})
.subscribe();
Thread.sleep(10_000);
}
private static Predicate<String> boundaries() {
AtomicInteger counter = new AtomicInteger(calcWindowSize());
return {
boolean done = counter.decrementAndGet() <= 0;
if (done) {
counter.set(calcWindowSize());
}
done
}
}
private static int calcWindowSize() {
// this calculation is a little more complex in reality
// so let's just use some random int here
return RandomUtils.nextInt(1, 10);
}