带有和不带有ObserveOn的RxJava PublishSubject

问题描述 投票:1回答:1

我有三个如下所示的Integer观察者:

第一观察员:

 private Observer<Integer> getFirstObserver() {
        return new Observer<Integer>() {

            @Override
            public void onSubscribe(Disposable d) {
            }

            @Override
            public void onNext(Integer integer) {
                Log.d(LOG_TAG, "onNext First " + integer);

            }

            @Override
            public void onError(Throwable e) {
            }

            @Override
            public void onComplete() {

            }
        };
    }

第二观察员:

private Observer<Integer> getSecondObserver() {
        return new Observer<Integer>() {

            @Override
            public void onSubscribe(Disposable d) {
            }

            @Override
            public void onNext(Integer integer) {
                Log.d(LOG_TAG, "onNext Second " + integer);

            }

            @Override
            public void onError(Throwable e) {
            }

            @Override
            public void onComplete() {
            }
        };
    }

ThirdObserver:

    private Observer<Integer> getThirdObserver() {
    return new Observer<Integer>() {

        @Override
        public void onSubscribe(Disposable d) {
        }

        @Override
        public void onNext(Integer integer) {
            Log.d(LOG_TAG, "onNext Third " + integer);

        }

        @Override
        public void onError(Throwable e) {
        }

        @Override
        public void onComplete() {
        }
    };
}

现在,如果我喜欢下面的代码:

    void asyncSubjectDemo1() {
        Observable<Integer> observable = Observable.just(1, 2, 3, 4);
        PublishSubject<Integer> asyncSubject = PublishSubject.create();
        observable.subscribe(asyncSubject);
        asyncSubject.subscribe(getFirstObserver());
        asyncSubject.subscribe(getSecondObserver());
        asyncSubject.subscribe(getThirdObserver());

    }

没有在Logcat中打印出任何文档所期望的内容

PublishSubject仅向观察者发出订阅之后源Observable发出的那些项目。

但是如果我在创建如下所示的Observable的同时添加observeOn并运行它,则>]

void asyncSubjectDemo1() {
            Observable<Integer> observable = Observable.just(1, 2, 3, 4).observeOn(AndroidSchedulers.mainThread());
            PublishSubject<Integer> asyncSubject = PublishSubject.create();
            observable.subscribe(asyncSubject);
            asyncSubject.subscribe(getFirstObserver());
            asyncSubject.subscribe(getSecondObserver());
            asyncSubject.subscribe(getThirdObserver());

        }

以下为输出

D/MY_LOG: onNext First 1
D/MY_LOG: onNext Second 1
D/MY_LOG: onNext Third 1
D/MY_LOG: onNext First 2
D/MY_LOG: onNext Second 2
D/MY_LOG: onNext Third 2

为什么在这种情况下有歧义?

我有三个整数观察者,如下所示:第一观察者:私有观察者getFirstObserver(){返回新的观察者(){@Override ...

rx-java reactivex rx-java3
1个回答
0
投票

请阅读PublishSubject的Javadoc:http://reactivex.io/RxJava/3.x/javadoc/io/reactivex/rxjava3/subjects/PublishSubject.html

“ PublishSubject不会保留/缓存项目,因此,新的Observer不会收到任何过去的项目。”

在第一种情况下,您将PublishSubject订阅到同步源,因此,此时所有项目都在执行甚至到达asyncSubject.subscribe(getFirstObserver());之前就已经通过了>

[在第二种情况下,现在已计划了源,并且当您向其订阅PublishSubject时,您将创建一个窗口或种族(取决于该方法的执行位置),因此asyncSubject.subscribe(getFirstObserver());等都有机会订阅主题,从而在以后接收项目。

© www.soinside.com 2019 - 2024. All rights reserved.