我有很多价值
[ 0.5, 0.3, 0.4, 0.6, 1.4, 0.3, 0.6 ]
并且我想把它变成
[ 1 2 1 ]
因此,我们将累加第一个流的值,直到达到整数(至少为1),然后在累积剩余量的同时发出整数。
这完全让我感到困惑,并认为使用switchMap可以解决一个问题。
这是我的方法:
src$ = src$.pipe(publish());
const wholeNumber$ = src$.pipe(
scan(
(acc, crt) => (acc | 0) > 1 ? crt + (+(acc - (acc | 0)).toPrecision(1)) : acc + crt, 0
),
map(v => (v | 0)),
filter(v => v >= 1),
);
src$.pipe(
buffer(wholeNumber$)
).subscribe();
publish
将确保未多次订阅源。它也是multicast(new Subject())
的简称,基本上是一种多播源的方法。为了使其正常工作,src$
必须异步发出,以使使用中的Subject
正确注册其订户(wholeNumber$
和另一个)。
如果源不异步发出,则可以使用src$.pipe(observeOn(asapScheduler))
force这样做,它将把每个通知安排为一个承诺。
让我们仔细看一下scan
提供的cb:
(acc, crt) => (acc | 0) > 1 ? crt + (+(acc - (acc | 0)).toPrecision(1)) : acc + crt`
[number | 0
与Math.trunc(number)
相同。
在+(acc - (acc | 0)).toPrecision(1)
中:
1.2 - 1
时,您会得到:0.199...96
;其中toPrecision(1)
:(1.2 - 1).toPrecision(1)
= "0.2"
。 +
将得到0.2
作为数字。谢谢安德烈,
我已经使用了您的代码并将其更改为自定义运算符。
export const bufferAmount = (
amount: number
): MonoTypeOperatorFunction<number> => (
source: Observable<number>
): Observable<number> =>
new Observable<number>((observer) => {
let bufferSum = 0;
return source.subscribe({
next(value) {
bufferSum += value;
if (bufferSum < amount) return;
const nextSum = Math.trunc(bufferSum);
bufferSum -= nextSum;
observer.next(nextSum);
},
error(error) {
observer.error(error);
},
complete() {
observer.complete();
},
});
});
ticker.pipe(bufferValues(1)).subscribe();