如果source1超时并且source1从source1反向发射,则RxJS从source2发出

问题描述 投票:2回答:2

我有一个Observable源source1发出值,如果它不发射超过2秒的任何东西,我想切换到后备源source2。如果source1再次发射,我想从它发射。等等,无限期。

到目前为止,我有以下内容

import { timeout, catchError, takeUntil, concat } from 'rxjs/operators';

declare const source1: Observable;
declare const source2: Observable;

source1.pipe(
    timeout(2000),
    catchError(() => {
      return source2.pipe(
        takeUntil(source1)
      );
    }),
    concat(source1)
).subscribe(val => console.log(val));

这几乎可行。如果source1在2秒后没有发射,它会从source2发射,直到source1再次发射,然后切换到source1。但是有两个主要缺陷:

  1. source1再次发射时,第一个发射值被takeUntil“抓住”(source1是一个热的观察者)并且不会在concat(source1)
  2. 如果source1第二次停止发射,我想有相同的行为。通过我的实现,它只能工作一次。

知道如何解决这个问题吗?

javascript typescript rxjs rxjs6
2个回答
2
投票

我想你可以通过共享source1然后使用repeat重新订阅到同一个链(我没有测试它)来做到这一点:

const shared1 = source1.pipe(share());

source1.pipe(
  timeout(2000),
  catchError(() => merge(source1, source2).pipe(
    takeUntil(source1),
  )),
  repeat(),
).subscribe(val => console.log(val));

1
投票

我发现解决我的第1点和第2点的解决方案如下

const source1HasStopped = source1.pipe(
  timeout(2000),
  catchError(() => of(1))
);

const fallback = source2.pipe(
  skipUntil(source1HasStopped),
  takeUntil(source1),
  repeat()
);

merge(source1, fallback).subscribe(console.log);

编辑:不幸的是,这会造成订阅泄漏,因为takeUntil不是最后的......

© www.soinside.com 2019 - 2024. All rights reserved.