Rx.Subject是一个热门观察者吗?

问题描述 投票:2回答:2

代码

const a = new Rx.Subject().do(x => console.log('a'))
const b = a.mapTo(0)
const c = a.mapTo(1)
const d = Rx.Observable.merge(b, c)
d.subscribe(x => console.log('d'))
a.next(3)

和输出

a
d
a
d

为什么打印两次?是不是Rx.Subject热门观察?

rxjs rxjs5
2个回答
7
投票

Subject本身很热/共享。

但是:您追加的任何(大多数!)运算符将创建一个新流,前一个流(在本例中为Subject)作为源 - 然而,新流(对于大多数运算符而言)不热并且只会被制作通过附加一个热运算符(如sharepublish等)来获得热流

因此,当你share你的do,一切都应该按预期工作。

const a = new Rx.Subject().do(x => console.log('a')).share();
const b = a.mapTo(0);
const c = a.mapTo(1);
const d = Rx.Observable.merge(b, c)
d.subscribe(x => console.log('d'));
a.next(3);
<script src="https://unpkg.com/rxjs/bundles/Rx.min.js"></script>

0
投票

你需要了解冷/热可观察和主题。

冷Observable是一个Observable,每次订阅时都会重新执行其订阅处理程序:

const cold = new Observable(function subscribe(observer) {
  console.log('subscribed');
  observer.next(Math.random());
  observer.complete();
});
// > subscribed
// sub 1: 0.1231231231231
cold.subscribe((num) => console.log('sub 1:', num));
// > subscribed
// sub 2: 0.09805969045
cold.subscribe((num) => console.log('sub 2:', num));

一个热的Observable是一个源Observable(冷或其他),它在源和订阅者之间有一个Subject。订阅热Observable时,订阅将在内部透明地路由到内部Subject,并且Subject将订阅源Observable。这可确保源Observable只有一个订阅者(Subject),并且Subject与许多订阅者共享源的值:

const cold = new Observable(function subscribe(observer) {
  console.log('subscribed');
  observer.next(Math.random());
  observer.complete();
});

const hot = cold.publish();
hot.subscribe((num) => console.log('sub 1:', num));
hot.subscribe((num) => console.log('sub 2:', num));
hot.connect(); // <-- this subscribes the inner Subject to the cold source
// > subscribed
// > sub 1: 0.249848935489
// > sub 2: 0.249848935489

您可以通过多播创建一个Observable hot,它接受一个返回Subject的函数,以便在连接时使用。为方便起见,还存在多播的变体(例如发布),其创建特定类型的主题。 publish()multicast(() => new Subject())的便利方法

除了connect(),它将内部主题订阅到源并返回基础订阅,您可以调用refCount(),它返回一个Observable。当refCount()返回的Observable订阅一次时,它将自动在内部调用connect(),后续订阅将不会重新连接。当所有订阅者取消订阅时,refCount将自动取消订阅内部主题。 share()source.publish().refCount()的便利方法。

所以,它会工作,

const a = new Rx.Subject().do(x => console.log('a')).share();
const b = a.mapTo(0);
const c = a.mapTo(1);
const d = Rx.Observable.merge(b, c)
d.subscribe(x => console.log('d'));
a.next(3);
© www.soinside.com 2019 - 2024. All rights reserved.