我想计算 Rxjs 中
Observable<{v: Observable<number|undefined>}[]>
的总和。
总和应始终反映当前状态(即从列表中删除项目或编辑新添加/现有项目时更新。
我提出了以下解决方案,这似乎可以满足我的要求,但我不完全理解它为什么有效,并且不确定它是否可以改进:
myObservable.pipe(
switchScan((acc, curr) => {
return combineLatest(curr.map((a) => a.v)).pipe(
map((values) => {
let sum: number | undefined;
for (const v of values) {
if (v === undefined) {
continue;
}
if (sum === undefined) {
sum = 0;
}
sum += v;
}
return sum;
}),
);
}, 0),
).subscribe((sum) => {(sum) => {console.log(sum)})
为了解释它,我使用以下可观察的结构:
private myObservable = of([
{ v: of(1) },
{ v: of(2) },
{ v: of(undefined) }
]);
因此
switchScan
运算符将归约函数应用于 myObservable
。减速器函数采用一个初始值,在您的情况下为 0,并采用 myObservable
的内容,即
[{ v: of(1) }, { v: of(2) },{ v: of(undefined) }]
接下来,您使用
combineLatest
,它接受一个可观察值数组,等待每个可观察值发出一个值并从中创建一个新的可观察值。您正确映射可观察量以获得
[of(1), of(2), of(undefined)]
这是您想要使用的格式
combineLatest
。由于所有可观察量都持有一个初始值,因此它会立即发出,然后您会看到 map
运算符,它现在会生成一个数字数组和 undefined
[1, 2, undefined]
现在,您使用 for 循环计算可观察量的总和并返回总和。在我的例子中是 3。
this.myObservable
.pipe(
switchMap((obs) => {
const nestedObs = obs.map((a) => a.v);
return forkJoin(nestedObs).pipe(
map((val) => {
return val.reduce(
(accumulator: number, currentValue: number | undefined) =>
accumulator + (currentValue || 0),
0
);
})
);
})
)
.subscribe((sum) => console.log('sum', sum));
您可以只使用
switchScan
,而不是使用 switchMap
,它会切换到内部可观察量并对每个发出的值进行修改。在您的情况下,由于只有一个数组,因此这将是上面的 switchScan
中的可观察数组。现在您可以像以前一样映射可观察数组。
接下来你可以使用
forkJoin
,它组合了一个可观察值数组,并按照数组中可观察值的顺序发出一个值数组。
[1, 2, undefined]
不,您可以应用减速函数并计算数组中值的总和。由于您也可以有未定义的值,因此您必须添加
||
运算符,它基本上检查 currentValue
是否未定义或为 null,否则使用 0。