我正在尝试从 Flux 的元素创建 Mono。这个单声道需要包含一定数量的元素。
我找到了解决办法,就是使用Mono.create方法,flux的takeUntil和onCancel方法。
private <T> void create(MonoSink<Map<String, Optional<T>>> monoSink,
Flux<Map<String, Optional<T>>> flux,
List<String> queryParams) {
Map<String, Optional<T>> result = new HashMap<>();
flux
.doOnNext(g -> result.putAll(subset(g, queryParams)))
.doOnCancel(() -> monoSink.success(result))
.takeUntil(f -> result.keySet().containsAll(queryParams))
.subscribe();
}
然后我调用它:
Mono.create(monoSink -> create(monoSink, flux, params))
通量是共享的,因此当我们调用 takeUntil 时,它并没有真正被取消。
这是一种不好的做法吗?
谢谢你
您的用例是带有扭曲的reduction操作:您只想部分消耗输入流。在这种情况下,我认为简单而明智的(对于发布者生命周期)解决方案是使用 scanWith 运算符:
与
takeUntil
+ last()
结合,它应该提供您所需要的:
public Mono<Map<String, T>> extractParameters(Flux<Map<String, T>> flux, List<String> queryParams) {
return flux
.scanWith(HashMap::new,
result, g -> {
result.putAll(subset(g, queryParams));
return result;
})
.takeUntil(result -> result.keySet().containsAll(queryParams))
.last();
}