如何在 Angular/RxJS 中不使用 toArray() 的情况下检索 Observable 末尾的数组?

问题描述 投票:0回答:1

我有一个可观察的丰富_sns$,我通过使用 concatMap 对每个发出的项目执行操作来丰富它,然后使用 toArray() 将结果组合到数组中。但是,我很好奇是否有其他方法可以在不使用 toArray() 的情况下实现相同的结果。

这是我当前的代码:

private enriched_sns$ = toObservable(this.sn_results).pipe(
  concatMap((results) => results as StreamNotificationResult[]),
  concatMap((notification) => {
    const user_id = notification.activities[0].actor.id;
    return zip(of(notification), this.read_user_data_service.readUserData(user_id));
  }),
  map(([notification, user]) => {
    return {
      notification,
      user: user,
    };
  }),
  toArray(),
);

有没有一种方法可以在可观察对象的末尾获取发出的项目数组,而无需诉诸 toArray()?任何帮助或替代方法将不胜感激。谢谢!

rxjs reactive declarative angular-signals
1个回答
0
投票

在示例中需要使用

toArray()
的原因是因为您要从信号中获取发射的数组,然后单独发射每个元素。

您可以使用

forkJoin
函数来处理订阅一组可观察值,而不是使用
concatMap
一次发出每个元素:

private enriched_sns$ = toObservable(this.sn_results).pipe(
  concatMap((results: StreamNotificationResult[]) => forkJoin(
    results.map(n => this.read_user_data_service.readUserData(n.activities[0].actor.id).pipe(
      map(user => ({ notification: n, user }))
    ))
  )),
);

有一个重要的区别

forkJoin
将同时订阅所有可观察量,而不是一次订阅一个。如果你需要限制并发,你可以查看
rxjs-etc
包,它有一个
forkJoinConcurrent
函数,它允许你指定并发:

private enriched_sns$ = toObservable(this.sn_results).pipe(
  concatMap((results: StreamNotificationResult[]) => forkJoinConcurrent(
    results.map(n=> this.read_user_data_service.readUserData(n.activities[0].actor.id).pipe(
      map(user => ({ notification: n, user }))
    ))
  ), 1), // <-- limit to one inner subscription at a time
);
© www.soinside.com 2019 - 2024. All rights reserved.