如何手动更新组合 Observable 的一部分?

问题描述 投票:0回答:1

为了让事情更容易说明,我有一个组合的 Observable

combinedNotifications$
,我将其绑定到异步管道。对于第一次加载它效果很好。然而。我希望能够手动更新
notifications$
。到目前为止,我从未深入研究可观察量,因为迄今为止的用例甚至比这个更简单。

  combinedNotifications$: Observable<any> = combineLatest([
    this.notifications$,
    this.notificationTypes$
  ]).pipe(
    map((i: any) => ({
      tableData: i[0],
      notificationTypes: i[1]
    })),
    map((f: any) => {
      return ({
        tableData: {
          ...f.tableData, content: f.tableData.content.map((v: any) => ({
            ...v,
            type: f.notificationTypes.eventTypes.find((x: any) => x.value === v.type)?.label,
            cause: f.notificationTypes.causeTypes.find((x: any) => x.value === v.cause)?.label
          }))
        },
        notificationTypes: f.notificationTypes
      })
    }),
    tap((d) => console.log(d)),
    shareReplay(1)
  );

我有一个通过分页与之交互的表格。在创建这个组合的可观察对象之前,我过去只能更新通知:

retrieveNotificationsPage(queryArgs: Pageable = {}, searchData: any = {}) {
    this._loading$.next(true);
    const actionAuditLogsPage$ = this.apiService
      .retrieveNotificationsView$(queryArgs, { ...searchData })
      .pipe(
        delay(300),
        tap({
          complete: () => {
            this._loading$.next(false);
          },
          error: (err) => {
            this._loading$.next(false);
            throw err;
          }
        }),
        catchError((err) => of({ error: err })),
        // shareReplay()
      );
    return actionAuditLogsPage$;
  }

然后被称为有点像这样......

 loadRecords(event: LazyLoadEvent) {
    ...
    this.notifications$ = this.retrieveNotificationsPage({
      page: isNaN(selectedPage) ? 0 : selectedPage,
      ...(maxPageRowSize != null ? { size: maxPageRowSize } : {})
    }, this.searchForm.value);
  }

所以我想我可能可以通过合并来做到这一点,但它似乎只是陷入了无限循环......

this.combinedNotifications$ = merge(notificationPage$, this.combinedNotifications$);
angular rxjs
1个回答
0
投票

您似乎想要手动更新

this.notifications$
,并确保
combinedNotifications$
反映这些更改,而不会影响
combinedNotifications$
由于
notifications$
notificationTypes$
的更改而更新时的原始行为。

要实现此目的,您可以使用

startWith
运算符发出
combinedNotifications$
的初始值,然后使用
merge
运算符将手动更新流与原始
combinedNotifications$
流合并。然后,您可以使用
distinctUntilChanged
运算符来确保仅发出不同的值,以避免无限循环。

import { merge, of } from 'rxjs';
import { startWith, distinctUntilChanged, switchMap } from 'rxjs/operators';

// Assume notificationPage$ is the observable returned by retrieveNotificationsPage

// Manual update trigger
const manualUpdate$ = new Subject<void>();

// Observable to handle manual updates
const manualUpdateNotifications$ = manualUpdate$.pipe(
  switchMap(() => this.retrieveNotificationsPage())
);

// Combine manual updates with original notifications$
const updatedNotifications$ = merge(
  manualUpdateNotifications$,
  this.notifications$.pipe(
    // Filter out initial emission
    startWith(null),
    // Distinct until changed to prevent infinite loops
    distinctUntilChanged()
  )
);

// Recreate combinedNotifications$ using the updated notifications$
this.combinedNotifications$ = combineLatest([
  updatedNotifications$,
  this.notificationTypes$
]).pipe(
  map((i: any) => ({
    tableData: i[0],
    notificationTypes: i[1]
  })),
  map((f: any) => ({
    tableData: {
      ...f.tableData,
      content: f.tableData.content.map((v: any) => ({
        ...v,
        type: f.notificationTypes.eventTypes.find((x: any) => x.value === v.type)?.label,
        cause: f.notificationTypes.causeTypes.find((x: any) => x.value === v.cause)?.label
      }))
    },
    notificationTypes: f.notificationTypes
  })),
  tap((d) => console.log(d)),
  shareReplay(1)
);

// Function to trigger manual updates
triggerManualUpdate() {
  manualUpdate$.next();
}

通过此设置,

this.combinedNotifications$
将反映自动更新的更改(由于
notifications$
notificationTypes$
的更改以及通过调用
triggerManualUpdate()
触发的手动更新)。每个更新都将是不同的,从而防止无限循环。

© www.soinside.com 2019 - 2024. All rights reserved.