如何保证rxjava方法并行执行,并完成?

问题描述 投票:0回答:1

虽然我已经写了很多年的Java代码,但我几乎没有用RxJava做任何工作,我需要了解如何将其映射到预期的结果。 在我工作的服务中,我有很多现有的代码,但我不相信他们正确使用RxJava。

请注意,我们使用的是旧版本的RxJava,2.1.10。 我现在无法升级。

以下是我在我们的代码库中看到的一种常见模式。

Single<ResultType> result1 = Single.<ResultType>create(source -> {
    source.onSuccess(method1(parameters));
}).subscribeOn(Schedulers.io());
Single<ReturnType> result2 = Single.<ResultType>create(source -> {
    source.onSuccess(method2(parameters));
}).subscribeOn(Schedulers.io());

if (null != result1 && null != result2) {

这样做的目的是 "method1 "和 "method2 "的执行是并行的,"null != result1 &&null != result2 "的检查发生在两个方法都执行完之后。我在想,有可能这两个意图在这里都没有实现,但我需要确认这一点,也需要如何正确实现这些目标。

java rx-java2
1个回答
1
投票

根据你的源代码的设置,你可以使用 combineLatest() 来等待两个来源的结果。一个概念验证的示例代码可能是这样的。

public static void main(String[] args) throws Exception {
    Callable<Integer> c1 = new Callable<Integer>() {
        @Override
        public Integer call() throws Exception {
            System.out.println(System.currentTimeMillis()+"|Starting first");
            Thread.sleep(1111);
            System.out.println(System.currentTimeMillis()+"|finished first");
            return 42;
        }};
    Single<Integer> singleFirst = Single.fromCallable(c1).subscribeOn(Schedulers.newThread());

    Callable<Integer> c2 = new Callable<Integer>() {
        @Override
        public Integer call() throws Exception {
            System.out.println(System.currentTimeMillis()+"|Starting second");
            Thread.sleep(5555);
            System.out.println(System.currentTimeMillis()+"|finished second");
            return 12;
        }};
    Single<Integer> singleSecond = Single.fromCallable(c2).subscribeOn(Schedulers.newThread());
    BiFunction<Integer, Integer, Integer> func = (a,b) -> a+b;

    ObservableSource<Integer> source1 = singleFirst.toObservable();
    ObservableSource<Integer> source2 = singleSecond.toObservable();
    Observable<Integer> resultSource = Observable.combineLatest(source1, source2, func);
    System.out.println(System.currentTimeMillis()+"|All setup, wait for completion");
    resultSource.blockingSubscribe(r -> {
        System.out.println(System.currentTimeMillis()+"|Result is: "+r);
    });
}

这可能会产生以下输出

1589229378890|All setup, wait for completion
1589229378895|Starting second
1589229378895|Starting first
1589229380007|finished first
1589229384451|finished second
1589229384452|Result is: 54

正如你所看到的 Single 订阅并行运行,它们的值被 "收集 "到了一个 "收集 "中。combineLatest() 在最后叫。

© www.soinside.com 2019 - 2024. All rights reserved.