我有三个如下所示的Integer观察者:
第一观察员:
private Observer<Integer> getFirstObserver() {
return new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer integer) {
Log.d(LOG_TAG, "onNext First " + integer);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
};
}
第二观察员:
private Observer<Integer> getSecondObserver() {
return new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer integer) {
Log.d(LOG_TAG, "onNext Second " + integer);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
};
}
ThirdObserver:
private Observer<Integer> getThirdObserver() {
return new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer integer) {
Log.d(LOG_TAG, "onNext Third " + integer);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
};
}
现在,如果我喜欢下面的代码:
void asyncSubjectDemo1() {
Observable<Integer> observable = Observable.just(1, 2, 3, 4);
PublishSubject<Integer> asyncSubject = PublishSubject.create();
observable.subscribe(asyncSubject);
asyncSubject.subscribe(getFirstObserver());
asyncSubject.subscribe(getSecondObserver());
asyncSubject.subscribe(getThirdObserver());
}
没有在Logcat中打印出任何文档所期望的内容
PublishSubject仅向观察者发出订阅之后源Observable发出的那些项目。
但是如果我在创建如下所示的Observable的同时添加observeOn并运行它,则>]
void asyncSubjectDemo1() { Observable<Integer> observable = Observable.just(1, 2, 3, 4).observeOn(AndroidSchedulers.mainThread()); PublishSubject<Integer> asyncSubject = PublishSubject.create(); observable.subscribe(asyncSubject); asyncSubject.subscribe(getFirstObserver()); asyncSubject.subscribe(getSecondObserver()); asyncSubject.subscribe(getThirdObserver()); }
以下为输出
D/MY_LOG: onNext First 1 D/MY_LOG: onNext Second 1 D/MY_LOG: onNext Third 1 D/MY_LOG: onNext First 2 D/MY_LOG: onNext Second 2 D/MY_LOG: onNext Third 2
为什么在这种情况下有歧义?
我有三个整数观察者,如下所示:第一观察者:私有观察者
请阅读PublishSubject的Javadoc:http://reactivex.io/RxJava/3.x/javadoc/io/reactivex/rxjava3/subjects/PublishSubject.html
“ PublishSubject不会保留/缓存项目,因此,新的Observer不会收到任何过去的项目。”
在第一种情况下,您将PublishSubject
订阅到同步源,因此,此时所有项目都在执行甚至到达asyncSubject.subscribe(getFirstObserver());
之前就已经通过了>
[在第二种情况下,现在已计划了源,并且当您向其订阅PublishSubject
时,您将创建一个窗口或种族(取决于该方法的执行位置),因此asyncSubject.subscribe(getFirstObserver());
等都有机会订阅主题,从而在以后接收项目。