我创建了一个ParallelFlux
,然后使用了.sequential()
,并希望到那时我可以计算或“减少”并行计算的结果。问题似乎是触发了并行线程,并且没有任何等待。
我有一些需要使用CountDownLatch
的东西,但我认为我不必这样做。
TL; DR-无法获得此代码的打印结果:
Flux.range(0, 1000000)
.parallel()
.runOn(Schedulers.elastic())
.sequential()
.count()
.subscribe(System.out::println);
在主体中执行该代码时,这很好地演示了事物的异步特性。该操作在弹性调度程序的线程上运行,并订阅触发异步处理,因此它立即返回。]
有两种方法可以将应用程序的末尾与Flux
的末尾同步:
在doOnNext
中打印并使用blockLast()
block *方法通常在main和test中使用,在那里别无选择,只能切换回阻塞模型(即,否则测试/ main将在处理结束之前退出)。
我们切换了在doOnNext
中打印每个发出的项目的“副作用”,它专门用于这种用例。然后,使用blockLast()
阻塞直到磁通完成。
Flux.range(0, 1000000) .parallel() .runOn(Schedulers.elastic()) .sequential() .count() .doOnNext(System.out::println) .blockLast();
在
subscribe
中打印并在CountDownLatch
中使用doFinally
这个比较复杂,但是如果您想探索该方法的工作原理,则可以实际使用subscribe。
我们只添加一个doFinally
,它会在任何完成信号(取消,错误或完成)上触发。我们使用CountDownLatch
阻塞主线程,该主线程将从doFinally
内部异步递减计数。
CountDownLatch latch = new CountDownLatch(1);
Flux.range(0, 1000000)
.parallel()
.runOn(Schedulers.elastic())
.sequential()
.count()
.doFinally(signal -> latch.countDown())
.subscribe(System.out::println);
latch.await(10, TimeUnit.SECONDS);