我有 4 个阻塞操作,想使用 RxJava Single/Flowable 并行运行它们。
我写的检查并行度的测试代码是:
final Single<Boolean> f1 = Single.fromCallable(() -> {
TimeUnit.SECONDS.sleep(2);
System.out.println("f1 " + Thread.currentThread().getName());
return true;
});
final Single<Boolean> f2 = Single.fromCallable(() -> {
System.out.println("f2 " + Thread.currentThread().getName());
return true;
});
final Single<Boolean> f3 = Single.fromCallable(() -> {
TimeUnit.SECONDS.sleep(1);
System.out.println("f3 " + Thread.currentThread().getName());
return true;
});
final Single<Boolean> f4 = Single.fromCallable(() -> {
System.out.println("f4 " + Thread.currentThread().getName());
return true;
});
Flowable.fromArray(f1, f2, f3, f4)
.flatMap(Single::toFlowable)
.parallel(4)
.runOn(Schedulers.io())
.sequential()
.ignoreElements()
.blockingAwait();
但是它们都在同一个线程上运行,依次给出以下输出:
f1 Test worker
f2 Test worker
f3 Test worker
f4 Test worker
尽管 f1 和 f3 的时间延迟...
使用
Flowable.parallel
可以很好地处理可流动的值,但现在在这种情况下。所以我错过了一些东西。
我怎样才能让我的代码工作,或者我怎样才能使用 RxJava 以优雅的方式并行运行可调用对象?