是否有任何方法可以强制groupBy()生成的Flux在一段时间后完成(或类似地,限制“开放”组的最大数量),而不管上游的完整性如何?我有以下内容:
Flux<Foo> someFastPublisher;
someFastPublisher
.groupBy(f -> f.getKey())
.delayElements(Duration.ofSeconds(1)) // rate limit each group
.flatMap(g -> g) // unwind the group
.subscribe()
;
并且我遇到了Flux挂起的情况,假设因为组的数量大于flatMap
的并发性。我可以增加flatMap
并发性,但没有简单的方法来判断最大可能的大小是多少。相反,我知道由Foo
分组的Foo.key
将在时间/发布顺序上彼此接近,并且宁愿在groupBy Flux与flatMap并发上使用某种时间窗口(并最终与两个不同的组w结束/同样的key()
并不是什么大不了的事。
我猜测groupBy
Flux将不会完成,直到someFastPubisher
onCompletes - 即通量传递给flatMap
只是保持“开放”(尽管他们不太可能得到一个新的事件)。
我可以通过在groupBy中预取Integer.MAX
或Integer.MAX
ing并发来解决这个问题 - 但是有没有办法控制组的“生命”?
是的:您可以将take(Duration)
应用于组以确保它们提前关闭,并且具有相同键的新组将在此之后打开:
source.groupBy(v -> v.intValue() % 2)
.flatMap(group -> group
.take(Duration.ofMillis(1000))
.count()
.map(c -> "group " + group.key() + " size = " + c)
)
.log()
.blockLast();