Mono.count无限期阻止

问题描述 投票:0回答:1

我跑了这个:

Mono<Void> mono = Mono.empty();

System.out.println("mono.block: " + mono.block());

它产生:

mono.block: null    

正如所料。换句话说,如果block已经完成,调用Mono将立即返回。

另一个例子,类似于现实世界的场景。我有一个光通量,例如:

Flux<Integer> ints = Flux.range(0, 2);

我制作了一个可连接的通量,我将使用它来允许多个订阅者:

ConnectableFlux<Integer> publish = ints.publish();

对于这个例子,假设有一个真实的用户:

publish
   .doOnComplete(() -> System.out.println("publish completed"))
   .subscribe();

和另一个只产生元素数的订阅者:

Mono<Long> countMono = publish
   .doOnComplete(() -> System.out.println("countMono completed"))
   .count();

countMono.subscribe();

我连接可连接的焊剂并打印元素数:

publish.connect();

System.out.println("block");

long count = countMono.block();

System.out.println("count: " + count);

这打印:

publish completed
countMono completed
block

换句话说,两个订阅者都成功订阅并完成,但随后countMono.block()无限期地阻止。

为什么这样,我如何使这项工作?我的最终目标是获得元素的数量。

java project-reactor
1个回答
1
投票

您可以使用autoConnectrefCount而不是手动调用connect()来实现此功能。

例如:

        Flux<Integer> ints = Flux.range(0, 2);
        Flux<Integer> publish = ints.publish()
                .autoConnect(2);  // new 
        publish
                .doOnComplete(() -> System.out.println("publish completed"))
                .subscribe();
        Mono<Long> countMono = publish
                .doOnComplete(() -> System.out.println("countMono completed"))
                .count();
        // countMono.subscribe();
        long count = countMono.block();
        System.out.println("count: " + count);

Why does your example not work?

以下是我认为在你的例子中发生的事情......但这是基于我有限的知识,而且我不是100%确定它是正确的。

  1. .publish()将上游源变为热流
  2. 然后你订阅了两次(但是这些还没有启动流程,因为可连接的通量还没有连接到上游)
  3. .connect()订阅了上游,并启动了流程
  4. 上游,以及在connect()完成之前注册的两个订阅(因为这一切都发生在主线程中)
  5. 此时ConnectableFlux不再连接到上游,因为上游已经完成(反应堆文档详细说明了当上游源完成后新订阅到达时ConnectableFlux会发生什么,所以这就是我不是100%肯定。)
  6. block()创建了一个新订阅。
  7. 但由于ConnectableFlux不再连接,因此没有数据流动
  8. 如果你再次调用connect()(从另一个线程,因为主线程被阻止),数据将再次流动,并且block()将完成。但是,这将是一个新序列(不是在步骤4中完成的原始序列)

Why does my example work?

只创建了两个订阅(而不是示例中的3个),一个来自.subscribe()调用,另一个来自.block()。 ConnectableFlux在2次订阅后自动连接,因此block()订阅完成。两个订阅共享相同的上游序列。

© www.soinside.com 2019 - 2024. All rights reserved.