我需要合并两个被动发布者的结果-Mono和Flux。我尝试使用zip
和join
函数来执行此操作,但是我无法满足两个特定条件:
join
来实现此条件)第一个条件的解决方案在Combine Mono with Flux条目中显示(粘贴如下)。但是我不阻止链条就无法达到第二个条件-我想避免。
Flux<Integer> flux = Flux.concat(Mono.just(1).delayElement(Duration.ofMillis(100)),
Mono.just(2).delayElement(Duration.ofMillis(500))).log();
Mono<String> mono = Mono.just("a").delayElement(Duration.ofMillis(50)).log();
List<String> list = flux.join(mono, (v1) -> Flux.never(), (v2) -> Flux.never(), (x, y) -> {
return x + y;
}).collectList().block();
System.out.println(list);
您可以执行以下操作
DirectProcessor<Integer> processor = DirectProcessor.create();
//Could omit sink, and use processor::onComplete in place of sink::complete
//But typically recommended as provides better thread safety
FluxSink<Integer> sink = processor.serialize().sink();
Flux<Integer> flux = Flux.concat(Mono.just(1).delayElement(Duration.ofMillis(100)),
Mono.just(2).delayElement(Duration.ofMillis(500))).log();
//Uncomment below and comment out above statement for empty flux
//Flux<Integer> flux = Flux.empty();
Mono<String> mono = Mono.just("a").delayElement(Duration.ofMillis(50));
List<String> list = flux
.doOnComplete(sink::complete)
.join(mono, s -> processor , s -> processor, (x, y) -> x + y).collectList().block();
System.out.println(list);