RxJava`Completable.andThen`没有连续执行?

问题描述 投票:4回答:2

我有一个用例,我在一个Completable中初始化一些全局变量,并在链的下一步(使用andThen运算符)我使用这些变量。

下面的示例详细解释了我的用例

假设你有一个班级User

        class User {
            String name;
        }

我有一个像这样的Observable,

        private User mUser; // this is a global variable

        public Observable<String> stringObservable() {
            return Completable.fromAction(() -> {
                mUser = new User();
                mUser.name = "Name";
            }).andThen(Observable.just(mUser.name));
        }           

首先,我在我的Completable.fromAction做了一些初始化,我希望andThen算子只在完成Completable.fromAction之后启动。

这意味着当mUser操作符启动时,我期望andThen被初始化。

以下是我对此观察的订阅

             stringObservable()
            .subscribe(s -> Log.d(TAG, "success: " + s),
                    throwable -> Log.e(TAG, "error: " + throwable.getMessage()));

但是,当我运行此代码时,我收到一个错误

          Attempt to read from field 'java.lang.String User.name' on a null object reference

这意味着mUser为null,andThen在执行Completable.fromAction中的代码之前就开始了。这里发生了什么事?

根据andThen的文件

返回一个Observable,它将订阅此Completable,一旦完成,就会订阅{@code next} ObservableSource。此Completable中的错误事件将传播到下游订户,并将导致跳过Observable的订阅。

java rx-java rx-java2
2个回答
28
投票

问题不在于andThen,而在于Observable.just(mUser.name)中的andThenjust运算符将尝试立即创建observable虽然它只会在Completable.fromAction之后发出。

这里的问题是,在尝试使用just创建Observable时,mUser为null。

解决方案:您需要推迟创建String Observable直到订阅发生,直到andThen的上游开始发射。

而不是andThen(Observable.just(mUser.name));

使用

 andThen(Observable.defer(() -> Observable.just(mUser.name)));

要么

 andThen(Observable.fromCallable(() -> mUser.name));

3
投票

我不认为@Sarath Kn的答案是100%正确的。是的just将在它被召唤时立即产生可观察性,但是andThen仍然在意外时间调用just

我们可以将andThenflatMap进行比较,以便更好地理解。这是一个完全可运行的测试:

package com.example;

import org.junit.Test;

import io.reactivex.Completable;
import io.reactivex.Observable;
import io.reactivex.observers.TestObserver;
import io.reactivex.schedulers.Schedulers;

public class ExampleTest {

    @Test
    public void createsIntermediateObservable_AfterSubscribing() {
        Observable<String> coldObservable = getObservableSource()
                .flatMap(integer -> getIntermediateObservable())
                .subscribeOn(Schedulers.trampoline())
                .observeOn(Schedulers.trampoline());
        System.out.println("Cold obs created... subscribing");
        TestObserver<String> testObserver = coldObservable.test();
        testObserver.awaitTerminalEvent();

        /*
        Resulting logs:

        Creating observable source
        Cold obs created... subscribing
        Emitting 1,2,3
        Creating intermediate observable
        Creating intermediate observable
        Creating intermediate observable
        Emitting complete notification

        IMPORTANT: see that intermediate observables are created AFTER subscribing
         */
    }

    @Test
    public void createsIntermediateObservable_BeforeSubscribing() {
        Observable<String> coldObservable = getCompletableSource()
                .andThen(getIntermediateObservable())
                .subscribeOn(Schedulers.trampoline())
                .observeOn(Schedulers.trampoline());
        System.out.println("Cold obs created... subscribing");
        TestObserver<String> testObserver = coldObservable.test();
        testObserver.awaitTerminalEvent();

        /*
        Resulting logs:

        Creating completable source
        Creating intermediate observable
        Cold obs created... subscribing
        Emitting complete notification

        IMPORTANT: see that intermediate observable is created BEFORE subscribing =(
         */
    }

    private Observable<Integer> getObservableSource() {
        System.out.println("Creating observable source");
        return Observable.create(emitter -> {
            System.out.println("Emitting 1,2,3");
            emitter.onNext(1);
            emitter.onNext(2);
            emitter.onNext(3);
            System.out.println("Emitting complete notification");
            emitter.onComplete();
        });
    }

    private Observable<String> getIntermediateObservable() {
        System.out.println("Creating intermediate observable");
        return Observable.just("A");
    }

    private Completable getCompletableSource() {
        System.out.println("Creating completable source");
        return Completable.create(emitter -> {
            System.out.println("Emitting complete notification");
            emitter.onComplete();
        });
    }
}

您可以看到,当我们使用flatmap时,just在订阅后被调用,这是有道理的。如果中间可观察量取决于发射到flatmap的项目,那么系统当然不能在订阅之前创建中间可观察对象。它还没有任何价值。如果flatmap在订阅之前调用just,你可以想象这不起作用:

.flatMap(integer -> getIntermediateObservable(integer))

奇怪的是,andThen能够在订阅之前创建它的内部可观察性(即调用just)。有意义的是,它可以做到这一点。 andThen将要接收的唯一内容是完整的通知,因此没有理由不尽早创建中间可观察对象。唯一的问题是它不是预期的行为。

@Sarath Kn的解决方案是正确的,但出于错误的原因。如果我们使用defer,我们可以看到事情按预期工作:

@Test
public void usingDefer_CreatesIntermediateObservable_AfterSubscribing() {
    Observable<String> coldObservable = getCompletableSource()
            .andThen(Observable.defer(this::getIntermediateObservable))
            .subscribeOn(Schedulers.trampoline())
            .observeOn(Schedulers.trampoline());
    System.out.println("Cold obs created... subscribing");
    TestObserver<String> testObserver = coldObservable.test();
    testObserver.awaitTerminalEvent();

    /*
    Resulting logs:

    Creating completable source
    Cold obs created... subscribing
    Emitting complete notification
    Creating intermediate observable

    IMPORTANT: see that intermediate observable is created AFTER subscribing =) YEAY!!
     */
}
© www.soinside.com 2019 - 2024. All rights reserved.