我有一个可观察的丰富_sns$,我通过使用 concatMap 对每个发出的项目执行操作来丰富它,然后使用 toArray() 将结果组合到数组中。但是,我很好奇是否有其他方法可以在不使用 toArray() 的情况下实现相同的结果。
这是我当前的代码:
private enriched_sns$ = toObservable(this.sn_results).pipe(
concatMap((results) => results as StreamNotificationResult[]),
concatMap((notification) => {
const user_id = notification.activities[0].actor.id;
return zip(of(notification), this.read_user_data_service.readUserData(user_id));
}),
map(([notification, user]) => {
return {
notification,
user: user,
};
}),
toArray(),
);
有没有一种方法可以在可观察对象的末尾获取发出的项目数组,而无需诉诸 toArray()?任何帮助或替代方法将不胜感激。谢谢!
在示例中需要使用
toArray()
的原因是因为您要从信号中获取发射的数组,然后单独发射每个元素。
您可以使用
forkJoin
函数来处理订阅一组可观察值,而不是使用 concatMap
一次发出每个元素:
private enriched_sns$ = toObservable(this.sn_results).pipe(
concatMap((results: StreamNotificationResult[]) => forkJoin(
results.map(n => this.read_user_data_service.readUserData(n.activities[0].actor.id).pipe(
map(user => ({ notification: n, user }))
))
)),
);
有一个重要的区别
forkJoin
将同时订阅所有可观察量,而不是一次订阅一个。如果你需要限制并发,你可以查看 rxjs-etc
包,它有一个 forkJoinConcurrent
函数,它允许你指定并发:
private enriched_sns$ = toObservable(this.sn_results).pipe(
concatMap((results: StreamNotificationResult[]) => forkJoinConcurrent(
results.map(n=> this.read_user_data_service.readUserData(n.activities[0].actor.id).pipe(
map(user => ({ notification: n, user }))
))
), 1), // <-- limit to one inner subscription at a time
);