虽然我已经写了很多年的Java代码,但我几乎没有用RxJava做任何工作,我需要了解如何将其映射到预期的结果。 在我工作的服务中,我有很多现有的代码,但我不相信他们正确使用RxJava。
请注意,我们使用的是旧版本的RxJava,2.1.10。 我现在无法升级。
以下是我在我们的代码库中看到的一种常见模式。
Single<ResultType> result1 = Single.<ResultType>create(source -> {
source.onSuccess(method1(parameters));
}).subscribeOn(Schedulers.io());
Single<ReturnType> result2 = Single.<ResultType>create(source -> {
source.onSuccess(method2(parameters));
}).subscribeOn(Schedulers.io());
if (null != result1 && null != result2) {
这样做的目的是 "method1 "和 "method2 "的执行是并行的,"null != result1 &&null != result2 "的检查发生在两个方法都执行完之后。我在想,有可能这两个意图在这里都没有实现,但我需要确认这一点,也需要如何正确实现这些目标。
根据你的源代码的设置,你可以使用 combineLatest()
来等待两个来源的结果。一个概念验证的示例代码可能是这样的。
public static void main(String[] args) throws Exception {
Callable<Integer> c1 = new Callable<Integer>() {
@Override
public Integer call() throws Exception {
System.out.println(System.currentTimeMillis()+"|Starting first");
Thread.sleep(1111);
System.out.println(System.currentTimeMillis()+"|finished first");
return 42;
}};
Single<Integer> singleFirst = Single.fromCallable(c1).subscribeOn(Schedulers.newThread());
Callable<Integer> c2 = new Callable<Integer>() {
@Override
public Integer call() throws Exception {
System.out.println(System.currentTimeMillis()+"|Starting second");
Thread.sleep(5555);
System.out.println(System.currentTimeMillis()+"|finished second");
return 12;
}};
Single<Integer> singleSecond = Single.fromCallable(c2).subscribeOn(Schedulers.newThread());
BiFunction<Integer, Integer, Integer> func = (a,b) -> a+b;
ObservableSource<Integer> source1 = singleFirst.toObservable();
ObservableSource<Integer> source2 = singleSecond.toObservable();
Observable<Integer> resultSource = Observable.combineLatest(source1, source2, func);
System.out.println(System.currentTimeMillis()+"|All setup, wait for completion");
resultSource.blockingSubscribe(r -> {
System.out.println(System.currentTimeMillis()+"|Result is: "+r);
});
}
这可能会产生以下输出
1589229378890|All setup, wait for completion
1589229378895|Starting second
1589229378895|Starting first
1589229380007|finished first
1589229384451|finished second
1589229384452|Result is: 54
正如你所看到的 Single
订阅并行运行,它们的值被 "收集 "到了一个 "收集 "中。combineLatest()
在最后叫。