使用 RxJS 取消队列中的所有项目

问题描述 投票:0回答:1

我有一个名为

ringBell
的函数,它返回一个完成铃声响起的 Promise。我希望能够通过在主题上调用
.next()
来触发铃声,并且我希望铃声排队,所以我会使用
concatMap

那么我应该如何a)单独取消当前铃声和b)取消所有待处理的铃声?

我想我可以让单个取消像这样工作,但我希望它能读起来更干净:

const ringer = new Subject<void>();
const cancels = new Subject<void>();
ringer
  .pipe(
    concatMap(() => {
      return defer(playBellWebAudio).pipe(
        // Allow single-cancelation
        takeUntil(cancels)
      );
    })
  )
  .subscribe();

ringer.next() // Add ring to the queue
cancels.next() // Cancel the current ring playing

但是,我无法像使用

@rxfx/effect
那样在原始 RxJS 中使用队列取消功能。有人可以展示单次和多次取消的惯用 RxJS 吗?

import { createEffect } from '@rxfx/effect'
const bellRinger = createQueuedEffect(ringBell);

bellRinger.cancelCurrent(); // cancels one
bellRinger.cancelCurrentAndQueued(); // also empties the queue

CodeSandbox:https://codesandbox.io/p/sandbox/rxjs-bell-effect-wkg9d3

顺便说一句,这类似于未回答的取消 concatMap 中的内部 Observable

promise rxjs observable
1个回答
0
投票

我认为你当前的代码非常好,你只需要处理整个队列取消,这可以通过多种方式完成,这里有一些例子:

const ringer = new Subject<void>();
const cancels = new Subject<void>();
const restartEntireQueue = new Subject<void>();

restartEntireQueue
  .pipe(
    switchMap(() =>
      ringer.pipe(
        concatMap(() => {
          return defer(playBellWebAudio).pipe(
            // Allow single-cancelation
            takeUntil(cancels)
          );
        })
      )
    )
  )
  .subscribe();

ringer.next(); // Add ring to the queue
cancels.next(); // Cancel the current ring playing

或者也

ringer
  .pipe(
    concatMap(() => {
      return defer(playBellWebAudio).pipe(
        // Allow single-cancelation
        takeUntil(cancels)
      );
    }),
    takeUntil(restartEntireQueue),
    repeat()
  )
© www.soinside.com 2019 - 2024. All rights reserved.