RxJs使用takeUntil间隔发布最后一个值

问题描述 投票:0回答:2

我有一些代码,轮询直到任务完成

见下文

this.simulationStatus =
  interval(2000).pipe(
    switchMap(
      () => from(this.simulationService.getSimulationStatus(this.route.snapshot.paramMap.get('jobId')))),
    takeUntil(this.stopPoll),
    tap(simulation => {
      if (simulation && simulation.complete) {
        if (this.stopCount == 1) {
          // Get once after complete
          this.stopPoll.next(true);
        }
        this.stopCount++;
      }
    })
  );

我已经尝试过使用takeUntil并且尽管问题是,任务完成后,最后一个值永远不会发布。

为了解决这个问题,我必须将tap方法包含在stopPoll主题中,并递增stopCount以获取最后一个值。

所以上面的工作但是感觉有点乱,我敢肯定必须有更好的方法来实现这个目标吗?

我本来期望takeUntil发布最后一个值或者有一个覆盖来告诉它例如takeUntil(observable,{publishLast:true})

BTW更新,Obularable由Angular 6模板订阅,提前感谢

angular6 rxjs6 rxjs-pipeable-operators
2个回答
1
投票

你可以做的一件事是使用像这样的自定义takeWhile类操作符:

const completeWith = <T>(predicate: (arg: T) => boolean) => (
  source: Observable<T>,
) =>
  new Observable<T>(observer =>
    source.subscribe(
      value => {
        observer.next(value);
        if (predicate(value)) {
          observer.complete();
        }
      },
      error => observer.error(error),
      () => observer.complete(),
    ),
  );

将它视为takeWhite的变体似乎不是一个好主意,因为它不仅仅是在条件成立时获取值,而且还会发出额外的值。

可能更优雅的解决方案是使模拟状态可观察发出两种值:下一个通知和完成通知,类似于操作符的实现/取消实现的工作方式。


1
投票

如果要完成observable,还可以使用next()创建主题和发射。

this.stopPoll: Subject<any> = new Subject<any>();

如果您想完成订阅。你可以调用this.stopPoll.next(true);

你可以访问subscribe()中的数据

this.simulationStatus.subscribe(success=>{}, failure=>{}, complete=>{});
© www.soinside.com 2019 - 2024. All rights reserved.