RxJS 5,将一个observable转换为一个BehaviorSubject(?)

问题描述 投票:13回答:2

我有一个父观察,一旦它有一个订阅者,将进行查找并发出一个值,然后完成。

我想将其转换为执行以下操作的可观察(或行为主题或其他任何工作):一旦它至少有一个订阅者,它就会从父可观察对象(一次)获得结果。然后它向所有订阅者发出该值,并在订阅时向所有未来订阅者发出该值。即使其订户数量降至零,它仍应继续此行为。

看起来这应该很容易。这是不起作用的:

theValue$: Observable<boolean> = parent$
.take(1)
.share()

其他不起作用的东西:publishReplay()publish()。更好的东西:

theValue$ = new BehaviorSubject<boolean>(false);

parent$
.take(1)
.subscribe( value => theValue$.next(value));

但是这种方法存在一个问题:parent$theValue$获得第一个订阅者之前订阅了。

有没有更好的方法来处理这个?

rxjs rxjs5
2个回答
15
投票

shareReplay应该做你想做的事:

import 'rxjs/add/operator/shareReplay';
...
theValue$: Observable<boolean> = parent$.shareReplay(1);

shareReplay在RxJS版本5.4.0中添加。它返回一个引用计数的observable,它将订阅源 - parent$ - 在第一次订阅时。在源完成后进行的订阅将收到重播通知。

shareReplay - 和refCount一般 - 在我最近写的一篇文章中有更详细的解释:RxJS: How to Use refCount


6
投票

我已经实现了一个将Observables转换为BehaviorSubjects的方法,因为我认为shareReplay方法对于将来的引用来说不太可读。

import { Observable } from 'rxjs/Observable';
import { BehaviorSubject } from 'rxjs/BehaviorSubject';

export function convertObservableToBehaviorSubject<T>(observable: Observable<T>, initValue: T): BehaviorSubject<T> {
    const subject = new BehaviorSubject(initValue);

    observable.subscribe(
        (x: T) => {
            subject.next(x);
        },
        (err: any) => {
            subject.error(err);
        },
        () => {
            subject.complete();
        },
    );

    return subject;
}
© www.soinside.com 2019 - 2024. All rights reserved.