从 switchmap 返回多播可观察值

问题描述 投票:0回答:1

我在尝试理解为什么在尝试创建共享可观察值时需要在外部可观察值处使用管道

share
运算符时遇到一些问题。我认为将
share
作为在
switchMap
内返回的可观察值的一部分传递就足够了,但看起来情况并非如此。

const {
  catchError,
  concatMap,
  count,
  debounceTime,
  delay,
  finalize,
  map,
  mapTo,
  mergeMap,
  startWith,
  switchMap,
  tap,
  take,
  toArray,
  takeUntil,
  bufferTime,
  filter,
  bufferWhen,
  share,
  distinctUntilChanged,
} = rxjs.operators;
const {
  fromEventPattern,
  BehaviorSubject,
  Subject,
  of,
  timer,
  interval,
} = rxjs;

const first = new BehaviorSubject(1);
const destroy = new Subject();
const first$ = first.asObservable().pipe(
  map((value) => parseInt(value, 10)),
  distinctUntilChanged(),
  takeUntil(destroy)
);

const createInterval = () =>
  interval(1000).pipe(
    finalize(() => console.log('finalize')),
    share() // here I create multicast observable
  );

const events$ = first$.pipe(
  share(),
  switchMap(() => createInterval())
  //share() // why this is needed outside and the `share` above is not sufficient ?
);

events$
  .pipe(tap((value) => console.log('first', value)))
  .subscribe((value) => console.log('subscription 1', value));

// not subscribing properly without share() above
events$
  .pipe(tap((value) => console.log('second', value)))
  .subscribe((value) => console.log('subscription 2', value));
<script src="https://unpkg.com/rxjs@^7/dist/bundles/rxjs.umd.min.js"></script>

javascript rxjs
1个回答
0
投票

文档中说:

返回一个新的 Observable,它多播(共享)原始的 Observable 可观察到的。只要至少有一个订阅者即可 Observable 将被订阅并发出数据。当所有订阅者 取消订阅它将取消订阅源 Observable。 因为 Observable 是多播的,所以流变得很热。这 是多播(()=>新主题()),refCount()的别名。

因此我们可以将可观察的输出流共享给多个订阅者,而无需创建新的流!

为了演示这一点,这里我有一个代码,其中没有共享,但在 switchMap 上方点击,以指示在每个订阅上执行间隔!第一个订阅会立即发生,但第二个订阅会在 3 秒后发生,你会看到这里有两个流,所以在 API 的上下文中,一个是静态的,我们可以使用 share 来减少调用次数,这只是举个例子!

const {
  catchError,
  concatMap,
  count,
  debounceTime,
  delay,
  finalize,
  map,
  mapTo,
  mergeMap,
  startWith,
  switchMap,
  tap,
  take,
  toArray,
  takeUntil,
  bufferTime,
  filter,
  bufferWhen,
  share,
  distinctUntilChanged,
} = rxjs.operators;
const {
  fromEventPattern,
  BehaviorSubject,
  Subject,
  of ,
  timer,
  interval,
} = rxjs;

const first = new BehaviorSubject(1);
const destroy = new Subject();
const first$ = first.asObservable().pipe(
  map((value) => parseInt(value, 10)),
  distinctUntilChanged(),
  takeUntil(destroy)
);

const createInterval = () =>
  interval(1000).pipe(
    finalize(() => console.log('finalize')),
  );

const events$ = first$.pipe(
  tap(() => console.log('calling')),
  switchMap(() => createInterval()),
  //share() // why this is needed outside and the `share` above is not sufficient ?
);

events$
  .pipe(tap((value) => console.log('first', value)))
  .subscribe((value) => console.log('subscription 1', value));

setTimeout(() => {
  // not subscribing properly without share() above
  events$
    .pipe(tap((value) => console.log('second', value)))
    .subscribe((value) => console.log('subscription 2', value));

}, 3000);
<script src="https://unpkg.com/rxjs@^7/dist/bundles/rxjs.umd.min.js"></script>

我们不需要多个共享,因为上面的代码将被共享,在您的情况下,我认为发生的是您正在与两个订阅者共享行为主体部分(直到共享上方的流!),因为您有不同的订阅者直到此处未更改为止,第一个订阅者会被订阅,因为值已更改,但第二个订阅者不会,因为它在流中具有相同的值!

解决此问题的方法是在内部可观察的

createInterval
中或在
events$
上(作为最后一个管道运算符)拥有单个共享。

最后这是一个工作示例,它运行良好!

const {
  catchError,
  concatMap,
  count,
  debounceTime,
  delay,
  finalize,
  map,
  mapTo,
  mergeMap,
  startWith,
  switchMap,
  tap,
  take,
  toArray,
  takeUntil,
  bufferTime,
  filter,
  bufferWhen,
  share,
  distinctUntilChanged,
} = rxjs.operators;
const {
  fromEventPattern,
  BehaviorSubject,
  Subject,
  of ,
  timer,
  interval,
} = rxjs;

const first = new BehaviorSubject(1);
const destroy = new Subject();
const first$ = first.asObservable().pipe(
  map((value) => parseInt(value, 10)),
  distinctUntilChanged(),
  takeUntil(destroy)
);

const createInterval = () =>
  interval(1000).pipe(
    finalize(() => console.log('finalize')),
    // share(),
  );

const events$ = first$.pipe(
  tap(() => console.log('calling')),
  switchMap(() => createInterval()),
  share(),
);

events$
  .pipe(tap((value) => console.log('first', value)))
  .subscribe((value) => console.log('subscription 1', value));

// not subscribing properly without share() above
events$
  .pipe(tap((value) => console.log('second', value)))
  .subscribe((value) => console.log('subscription 2', value));
<script src="https://unpkg.com/rxjs@^7/dist/bundles/rxjs.umd.min.js"></script>

© www.soinside.com 2019 - 2024. All rights reserved.