创建一个不会在完成或错误时终止的 RxJava 主题

问题描述 投票:0回答:3

我正在尝试创建一个可以订阅

Subject
的 RxJava 2
Observable
,但当订阅的
Observable
由于错误或完成而终止时不会终止。

本质上,这将充当可以订阅其他

Observable
s的事件总线。

执行此操作的正确方法是什么,以避免传播终止并避免泄漏?

rx-java rx-java2
3个回答
1
投票

这应该可以解决您的问题:https://github.com/JakeWharton/RxRelay/


0
投票

我认为你最好的选择是使用操作员“中继”,一旦所有项目都已在管道中发出,它就不会取消订阅

   /**
     * Relay is just an observable which subscribe an observer, but it wont unsubscribe once emit the items. So the pipeline keep open
     * It should return 1,2,3,4,5 for first observer and just 3, 4, 5 fot the second observer since default relay emit last emitted item,
     * and all the next items passed to the pipeline.
     */
    @Test
    public void testRelay() throws InterruptedException {
        BehaviorRelay<String> relay = BehaviorRelay.create("default");
        relay.subscribe(result -> System.out.println("Observer1:" + result));
        relay.call("1");
        relay.call("2");
        relay.call("3");
        relay.subscribe(result -> System.out.println("Observer2:" + result));
        relay.call("4");
        relay.call("5");
    }

你可以在这里看到更多中继的例子https://github.com/politrons/reactive/blob/master/src/test/java/rx/relay/Relay.java


0
投票

在 RxJava3 中,我可以使用 Observable.flatMap() 来解决这个问题:

    @Test
    void testReplacingObservable() {
        Subject<Observable<Integer>> observableStream = UnicastSubject.create();
        Observable<Integer> integerStream = observableStream.flatMap(observable -> observable);
        TestObserver<Integer> observer = integerStream.test();

        // now can keep adding new observables to the source observable stream
        observableStream.onNext(Observable.just(0, 1, 2, 3, 4));
        observableStream.onNext(Observable.just(5, 6));
        observableStream.onNext(Observable.just(7, 8, 9));

        observer.assertValueCount(10);
        observer.assertValueAt(0, 0);
        observer.assertValueAt(5, 5);
        observer.assertValueAt(7, 7);
    }
© www.soinside.com 2019 - 2024. All rights reserved.