从 Flux 的某些元素创建 Mono

问题描述 投票:0回答:1

我正在尝试从 Flux 的元素创建 Mono。这个单声道需要包含一定数量的元素。

我找到了解决办法,就是使用Mono.create方法,flux的takeUntil和onCancel方法。

private <T> void create(MonoSink<Map<String, Optional<T>>> monoSink,
                            Flux<Map<String, Optional<T>>> flux,
                            List<String> queryParams) {
        Map<String, Optional<T>> result = new HashMap<>();
        flux
                .doOnNext(g -> result.putAll(subset(g, queryParams)))
                .doOnCancel(() -> monoSink.success(result))
                .takeUntil(f -> result.keySet().containsAll(queryParams))
                .subscribe();
    }

然后我调用它:

Mono.create(monoSink -> create(monoSink, flux, params))

通量是共享的,因此当我们调用 takeUntil 时,它并没有真正被取消。

这是一种不好的做法吗?

谢谢你

java spring-webflux reactive-programming project-reactor
1个回答
0
投票

您的用例是带有扭曲的reduction操作:您只想部分消耗输入流。在这种情况下,我认为简单而明智的(对于发布者生命周期)解决方案是使用 scanWith 运算符:

  1. 与归约一样,它允许将流元素一一聚合/累积
  2. 但是,与归约相反,它为每个累积元素返回中间状态的通量。

takeUntil
+
last()
结合,它应该提供您所需要的:

public Mono<Map<String, T>> extractParameters(Flux<Map<String, T>> flux, List<String> queryParams) {
       return flux
           .scanWith(HashMap::new, 
                     result, g -> {
                         result.putAll(subset(g, queryParams));
                         return result;
                     })
           .takeUntil(result -> result.keySet().containsAll(queryParams))
           .last();
}
© www.soinside.com 2019 - 2024. All rights reserved.