为什么我的 RxJS Observable 链在后续发射时不重新执行?

问题描述 投票:0回答:1

我正在 Angular 服务中使用 RxJS,并且我的一个可观察链遇到了一个特殊问题。具体来说,我有两个可观察链,旨在用用户数据丰富通知流。第一个可观察链正确记录并映射结果,并且可以多次触发而不会出现问题。然而,第二个 observable 链包含一个

concatMap
运算符,用于将通知与用户数据组合并丰富,仅执行一次,并且不会响应源 observable 的后续发射。

这是按预期工作并允许重复执行的可观察链:

private enriched_sns$ = this.sn_results$.asObservable().pipe(
  tap((results) => console.log('%cResults', 'color: cyan', results)),
  concatMap((results) => {
    console.log('before', results);
    return results;
  }),
  tap((results) => console.log('after', results)),
  map((results) => results),
);

这是有问题的可观察链,仅运行一次:

private enriched_sns$ = this.sn_results$.asObservable().pipe(
  tap((results) => console.log('%cResults', 'color: cyan', results)),
  concatMap((results) => {
    console.log('before', results);
    const enrichedNotifications$ = zip(
      results.map((result: StreamNotificationResult) => {
        return this.read_user_data_service.readUserData(result.activities[0].actor.id).pipe(
          map((user) => {
            const enriched_notification: StreamEnrichedNotification = {
                user,
                notification: result,
            };
            return enriched_notification;
          }),
        );
      }),
    );
    // Combine the original results and the enriched notifications
    return combineLatest([of(results), enrichedNotifications$]);
  }),
  tap((results) => console.log('after', results)),
  map((results) => results),
);

我怀疑这个问题可能与我如何使用

concatMap
或可能与可观察值的组合和返回方式有关,但我不确定为什么它只执行一次而不对
this.sn_results$
的后续发射做出反应。

有人遇到过类似的问题或者可以找出可能导致此行为的原因吗?任何有关如何确保第二个可观察链对来自可观察源的每次发射做出反应的见解或建议将不胜感激。

angular typescript rxjs
1个回答
0
投票

我发现

readUserData()
在下次发射时没有返回任何内容的原因是因为我使用的是
combineLatest()
而不是
zip()
。切换到
zip()
后,我能够接收第二次发射的数据,因为
zip()
会等待所有可观察对象完成后再返回结果。

private enriched_sns$ = this.sn_results$.asObservable().pipe(
  tap((results) => console.log('%cResults', 'color: cyan', results)),
  concatMap((results) => {
    console.log('before', results);
    const enrichedNotifications$ = zip(
      results.map((result: StreamNotificationResult) => {
        return this.read_user_data_service.readUserData(result.activities[0].actor.id).pipe(
          map((user) => {
            const enriched_notification: StreamEnrichedNotification = {
                user,
                notification: result,
            };
            return enriched_notification;
          }),
        );
      }),
    );
    // Combine the original results and the enriched notifications
    return zip([of(results), enrichedNotifications$]); // Now it will wait for all observable to complete
  }),
  tap((results) => console.log('after', results)),
  map((results) => results),
);
© www.soinside.com 2019 - 2024. All rights reserved.