代码
const a = new Rx.Subject().do(x => console.log('a'))
const b = a.mapTo(0)
const c = a.mapTo(1)
const d = Rx.Observable.merge(b, c)
d.subscribe(x => console.log('d'))
a.next(3)
和输出
a
d
a
d
为什么打印两次?是不是Rx.Subject热门观察?
Subject
本身很热/共享。
但是:您追加的任何(大多数!)运算符将创建一个新流,前一个流(在本例中为Subject
)作为源 - 然而,新流(对于大多数运算符而言)不热并且只会被制作通过附加一个热运算符(如share
或publish
等)来获得热流
因此,当你share
你的do
,一切都应该按预期工作。
const a = new Rx.Subject().do(x => console.log('a')).share();
const b = a.mapTo(0);
const c = a.mapTo(1);
const d = Rx.Observable.merge(b, c)
d.subscribe(x => console.log('d'));
a.next(3);
<script src="https://unpkg.com/rxjs/bundles/Rx.min.js"></script>
你需要了解冷/热可观察和主题。
冷Observable是一个Observable,每次订阅时都会重新执行其订阅处理程序:
const cold = new Observable(function subscribe(observer) {
console.log('subscribed');
observer.next(Math.random());
observer.complete();
});
// > subscribed
// sub 1: 0.1231231231231
cold.subscribe((num) => console.log('sub 1:', num));
// > subscribed
// sub 2: 0.09805969045
cold.subscribe((num) => console.log('sub 2:', num));
一个热的Observable是一个源Observable(冷或其他),它在源和订阅者之间有一个Subject。订阅热Observable时,订阅将在内部透明地路由到内部Subject,并且Subject将订阅源Observable。这可确保源Observable只有一个订阅者(Subject),并且Subject与许多订阅者共享源的值:
const cold = new Observable(function subscribe(observer) {
console.log('subscribed');
observer.next(Math.random());
observer.complete();
});
const hot = cold.publish();
hot.subscribe((num) => console.log('sub 1:', num));
hot.subscribe((num) => console.log('sub 2:', num));
hot.connect(); // <-- this subscribes the inner Subject to the cold source
// > subscribed
// > sub 1: 0.249848935489
// > sub 2: 0.249848935489
您可以通过多播创建一个Observable hot,它接受一个返回Subject的函数,以便在连接时使用。为方便起见,还存在多播的变体(例如发布),其创建特定类型的主题。 publish()
是multicast(() => new Subject())
的便利方法
除了connect()
,它将内部主题订阅到源并返回基础订阅,您可以调用refCount()
,它返回一个Observable。当refCount()
返回的Observable订阅一次时,它将自动在内部调用connect()
,后续订阅将不会重新连接。当所有订阅者取消订阅时,refCount
将自动取消订阅内部主题。 share()
是source.publish().refCount()
的便利方法。
所以,它会工作,
const a = new Rx.Subject().do(x => console.log('a')).share();
const b = a.mapTo(0);
const c = a.mapTo(1);
const d = Rx.Observable.merge(b, c)
d.subscribe(x => console.log('d'));
a.next(3);