如何使用 Flowable 和 Single 并行运行一组可调用对象

问题描述 投票:0回答:0

我有 4 个阻塞操作,想使用 RxJava Single/Flowable 并行运行它们。

我写的检查并行度的测试代码是:

final Single<Boolean> f1 = Single.fromCallable(() -> {
    TimeUnit.SECONDS.sleep(2);
    System.out.println("f1 " + Thread.currentThread().getName());
    return true;
});
final Single<Boolean> f2 = Single.fromCallable(() -> {
    System.out.println("f2 " + Thread.currentThread().getName());
    return true;
});
final Single<Boolean> f3 = Single.fromCallable(() -> {
    TimeUnit.SECONDS.sleep(1);
    System.out.println("f3 " + Thread.currentThread().getName());
    return true;
});
final Single<Boolean> f4 = Single.fromCallable(() -> {
    System.out.println("f4 " + Thread.currentThread().getName());
    return true;
});

Flowable.fromArray(f1, f2, f3, f4)
        .flatMap(Single::toFlowable)
        .parallel(4)
        .runOn(Schedulers.io())
        .sequential()
        .ignoreElements()
        .blockingAwait();

但是它们都在同一个线程上运行,依次给出以下输出:

f1 Test worker
f2 Test worker
f3 Test worker
f4 Test worker

尽管 f1 和 f3 的时间延迟...

使用

Flowable.parallel
可以很好地处理可流动的值,但现在在这种情况下。所以我错过了一些东西。

我怎样才能让我的代码工作,或者我怎样才能使用 RxJava 以优雅的方式并行运行可调用对象?

java parallel-processing rx-java rx-java2 rx-java3
© www.soinside.com 2019 - 2024. All rights reserved.