我跑了这个:
Mono<Void> mono = Mono.empty();
System.out.println("mono.block: " + mono.block());
它产生:
mono.block: null
正如所料。换句话说,如果block
已经完成,调用Mono
将立即返回。
另一个例子,类似于现实世界的场景。我有一个光通量,例如:
Flux<Integer> ints = Flux.range(0, 2);
我制作了一个可连接的通量,我将使用它来允许多个订阅者:
ConnectableFlux<Integer> publish = ints.publish();
对于这个例子,假设有一个真实的用户:
publish
.doOnComplete(() -> System.out.println("publish completed"))
.subscribe();
和另一个只产生元素数的订阅者:
Mono<Long> countMono = publish
.doOnComplete(() -> System.out.println("countMono completed"))
.count();
countMono.subscribe();
我连接可连接的焊剂并打印元素数:
publish.connect();
System.out.println("block");
long count = countMono.block();
System.out.println("count: " + count);
这打印:
publish completed
countMono completed
block
换句话说,两个订阅者都成功订阅并完成,但随后countMono.block()
无限期地阻止。
为什么这样,我如何使这项工作?我的最终目标是获得元素的数量。
您可以使用autoConnect
或refCount
而不是手动调用connect()
来实现此功能。
例如:
Flux<Integer> ints = Flux.range(0, 2);
Flux<Integer> publish = ints.publish()
.autoConnect(2); // new
publish
.doOnComplete(() -> System.out.println("publish completed"))
.subscribe();
Mono<Long> countMono = publish
.doOnComplete(() -> System.out.println("countMono completed"))
.count();
// countMono.subscribe();
long count = countMono.block();
System.out.println("count: " + count);
以下是我认为在你的例子中发生的事情......但这是基于我有限的知识,而且我不是100%确定它是正确的。
.publish()
将上游源变为热流.connect()
订阅了上游,并启动了流程connect()
完成之前注册的两个订阅(因为这一切都发生在主线程中)block()
创建了一个新订阅。connect()
(从另一个线程,因为主线程被阻止),数据将再次流动,并且block()
将完成。但是,这将是一个新序列(不是在步骤4中完成的原始序列)只创建了两个订阅(而不是示例中的3个),一个来自.subscribe()
调用,另一个来自.block()
。 ConnectableFlux在2次订阅后自动连接,因此block()
订阅完成。两个订阅共享相同的上游序列。