有条件地将单声道与磁通结合起来

问题描述 投票:0回答:1

我需要合并两个被动发布者的结果-Mono和Flux。我尝试使用zipjoin函数来执行此操作,但是我无法满足两个特定条件:

  1. 结果应该包含与Flux发出的元素一样多的元素,但是相应的Mono源应该仅被调用一次(仅可以使用join来实现此条件)
  2. 当Flux为空时,链应该完成而无需等待Mono元素

第一个条件的解决方案在Combine Mono with Flux条目中显示(粘贴如下)。但是我不阻止链条就无法达到第二个条件-我想避免。

Flux<Integer> flux = Flux.concat(Mono.just(1).delayElement(Duration.ofMillis(100)),
        Mono.just(2).delayElement(Duration.ofMillis(500))).log();

Mono<String> mono = Mono.just("a").delayElement(Duration.ofMillis(50)).log();

List<String> list = flux.join(mono, (v1) -> Flux.never(), (v2) -> Flux.never(), (x, y) -> {
    return x + y;
}).collectList().block();

System.out.println(list);
java project-reactor
1个回答
0
投票

您可以执行以下操作

DirectProcessor<Integer> processor = DirectProcessor.create();
//Could omit sink, and use processor::onComplete in place of sink::complete
//But typically recommended as provides better thread safety
FluxSink<Integer> sink = processor.serialize().sink();

Flux<Integer> flux = Flux.concat(Mono.just(1).delayElement(Duration.ofMillis(100)),
    Mono.just(2).delayElement(Duration.ofMillis(500))).log();

//Uncomment below and comment out above statement for empty flux

//Flux<Integer> flux = Flux.empty();

Mono<String> mono = Mono.just("a").delayElement(Duration.ofMillis(50));

List<String> list = flux
     .doOnComplete(sink::complete)
     .join(mono, s -> processor , s -> processor, (x, y) -> x + y).collectList().block();

System.out.println(list);
© www.soinside.com 2019 - 2024. All rights reserved.