RXJS bufferedAmount /累加值/复位后减少

问题描述 投票:2回答:2

我有很多价值

[ 0.5, 0.3, 0.4, 0.6, 1.4, 0.3, 0.6 ]

并且我想把它变成

[           1         2         1 ]

因此,我们将累加第一个流的值,直到达到整数(至少为1),然后在累积剩余量的同时发出整数。

这完全让我感到困惑,并认为使用switchMap可以解决一个问题。

typescript rxjs rxjs6
2个回答
2
投票

这是我的方法:

src$ = src$.pipe(publish());

const wholeNumber$ = src$.pipe(
  scan(
    (acc, crt) => (acc | 0) > 1 ? crt + (+(acc - (acc | 0)).toPrecision(1)) : acc + crt, 0
  ),
  map(v => (v | 0)),
  filter(v => v >= 1),
);

src$.pipe(
  buffer(wholeNumber$)
).subscribe();

publish将确保未多次订阅源。它也是multicast(new Subject())的简称,基本上是一种多播源的方法。为了使其正常工作,src$必须异步发出,以使使用中的Subject正确注册其订户(wholeNumber$和另一个)。

如果源不异步发出,则可以使用src$.pipe(observeOn(asapScheduler)) force这样做,它将把每个通知安排为一个承诺。

让我们仔细看一下scan提供的cb:

(acc, crt) => (acc | 0) > 1 ? crt + (+(acc - (acc | 0)).toPrecision(1)) : acc + crt`

[number | 0Math.trunc(number)相同。

+(acc - (acc | 0)).toPrecision(1)中:

  • [当您执行1.2 - 1时,您会得到:0.199...96;其中toPrecision(1)(1.2 - 1).toPrecision(1) = "0.2"+将得到0.2作为数字。

0
投票

谢谢安德烈,

我已经使用了您的代码并将其更改为自定义运算符。


export const bufferAmount = (
  amount: number
): MonoTypeOperatorFunction<number> => (
  source: Observable<number>
): Observable<number> =>
  new Observable<number>((observer) => {
    let bufferSum = 0;
    return source.subscribe({
      next(value) {
        bufferSum += value;
        if (bufferSum < amount) return;
        const nextSum = Math.trunc(bufferSum);
        bufferSum -= nextSum;
        observer.next(nextSum);
      },
      error(error) {
        observer.error(error);
      },
      complete() {
        observer.complete();
      },
    });
  });

ticker.pipe(bufferValues(1)).subscribe();

© www.soinside.com 2019 - 2024. All rights reserved.