等待ParallelFlux完成

问题描述 投票:0回答:1

我创建了一个ParallelFlux,然后使用了.sequential(),并希望到那时我可以计算或“减少”并行计算的结果。问题似乎是触发了并行线程,并且没有任何等待。

我有一些需要使用CountDownLatch的东西,但我认为我不必这样做。

TL; DR-无法获得此代码的打印结果:

    Flux.range(0, 1000000)
    .parallel()
    .runOn(Schedulers.elastic())
    .sequential()
    .count()
    .subscribe(System.out::println);
java reactive-programming spring-webflux project-reactor reactor
1个回答
0
投票

在主体中执行该代码时,这很好地演示了事物的异步特性。该操作在弹性调度程序的线程上运行,并订阅触发异步处理,因此它立即返回。]

有两种方法可以将应用程序的末尾与Flux的末尾同步:

doOnNext中打印并使用blockLast()

block *方法通常在main和test中使用,在那里别无选择,只能切换回阻塞模型(即,否则测试/ main将在处理结束之前退出)。

我们切换了在doOnNext中打印每个发出的项目的“副作用”,它专门用于这种用例。然后,使用blockLast()阻塞直到磁通完成。

Flux.range(0, 1000000)
    .parallel()
    .runOn(Schedulers.elastic())
    .sequential()
    .count()
    .doOnNext(System.out::println)
    .blockLast();

subscribe中打印并在CountDownLatch中使用doFinally

这个比较复杂,但是如果您想探索该方法的工作原理,则可以实际使用subscribe。

我们只添加一个doFinally,它会在任何完成信号(取消,错误或完成)上触发。我们使用CountDownLatch阻塞主线程,该主线程将从doFinally内部异步递减计数。

CountDownLatch latch = new CountDownLatch(1);

Flux.range(0, 1000000)
    .parallel()
    .runOn(Schedulers.elastic())
    .sequential()
    .count()
    .doFinally(signal -> latch.countDown())
    .subscribe(System.out::println);

latch.await(10, TimeUnit.SECONDS);
© www.soinside.com 2019 - 2024. All rights reserved.