Reactor 3.x - 限制groupBy Flux的时间

问题描述 投票:4回答:1

是否有任何方法可以强制groupBy()生成的Flux在一段时间后完成(或类似地,限制“开放”组的最大数量),而不管上游的完整性如何?我有以下内容:

Flux<Foo> someFastPublisher;

someFastPublisher
  .groupBy(f -> f.getKey())
  .delayElements(Duration.ofSeconds(1)) // rate limit each group
  .flatMap(g -> g) // unwind the group
  .subscribe()
;

并且我遇到了Flux挂起的情况,假设因为组的数量大于flatMap的并发性。我可以增加flatMap并发性,但没有简单的方法来判断最大可能的大小是多少。相反,我知道由Foo分组的Foo.key将在时间/发布顺序上彼此接近,并且宁愿在groupBy Flux与flatMap并发上使用某种时间窗口(并最终与两个不同的组w结束/同样的key()并不是什么大不了的事。

我猜测groupBy Flux将不会完成,直到someFastPubisher onCompletes - 即通量传递给flatMap只是保持“开放”(尽管他们不太可能得到一个新的事件)。

我可以通过在groupBy中预取Integer.MAXInteger.MAXing并发来解决这个问题 - 但是有没有办法控制组的“生命”?

project-reactor
1个回答
4
投票

是的:您可以将take(Duration)应用于组以确保它们提前关闭,并且具有相同键的新组将在此之后打开:

source.groupBy(v -> v.intValue() % 2)
      .flatMap(group -> group
              .take(Duration.ofMillis(1000))
              .count()
              .map(c -> "group " + group.key() + " size = " + c)
      )
      .log()
      .blockLast();
© www.soinside.com 2019 - 2024. All rights reserved.