我正在尝试创建一个可以订阅
Subject
的 RxJava 2 Observable
,但当订阅的 Observable
由于错误或完成而终止时不会终止。
本质上,这将充当可以订阅其他
Observable
s的事件总线。
执行此操作的正确方法是什么,以避免传播终止并避免泄漏?
这应该可以解决您的问题:https://github.com/JakeWharton/RxRelay/
我认为你最好的选择是使用操作员“中继”,一旦所有项目都已在管道中发出,它就不会取消订阅
/**
* Relay is just an observable which subscribe an observer, but it wont unsubscribe once emit the items. So the pipeline keep open
* It should return 1,2,3,4,5 for first observer and just 3, 4, 5 fot the second observer since default relay emit last emitted item,
* and all the next items passed to the pipeline.
*/
@Test
public void testRelay() throws InterruptedException {
BehaviorRelay<String> relay = BehaviorRelay.create("default");
relay.subscribe(result -> System.out.println("Observer1:" + result));
relay.call("1");
relay.call("2");
relay.call("3");
relay.subscribe(result -> System.out.println("Observer2:" + result));
relay.call("4");
relay.call("5");
}
你可以在这里看到更多中继的例子https://github.com/politrons/reactive/blob/master/src/test/java/rx/relay/Relay.java
在 RxJava3 中,我可以使用 Observable.flatMap() 来解决这个问题:
@Test
void testReplacingObservable() {
Subject<Observable<Integer>> observableStream = UnicastSubject.create();
Observable<Integer> integerStream = observableStream.flatMap(observable -> observable);
TestObserver<Integer> observer = integerStream.test();
// now can keep adding new observables to the source observable stream
observableStream.onNext(Observable.just(0, 1, 2, 3, 4));
observableStream.onNext(Observable.just(5, 6));
observableStream.onNext(Observable.just(7, 8, 9));
observer.assertValueCount(10);
observer.assertValueAt(0, 0);
observer.assertValueAt(5, 5);
observer.assertValueAt(7, 7);
}