为了让事情更容易说明,我有一个组合的 Observable
combinedNotifications$
,我将其绑定到异步管道。对于第一次加载它效果很好。然而。我希望能够手动更新notifications$
。到目前为止,我从未深入研究可观察量,因为迄今为止的用例甚至比这个更简单。
combinedNotifications$: Observable<any> = combineLatest([
this.notifications$,
this.notificationTypes$
]).pipe(
map((i: any) => ({
tableData: i[0],
notificationTypes: i[1]
})),
map((f: any) => {
return ({
tableData: {
...f.tableData, content: f.tableData.content.map((v: any) => ({
...v,
type: f.notificationTypes.eventTypes.find((x: any) => x.value === v.type)?.label,
cause: f.notificationTypes.causeTypes.find((x: any) => x.value === v.cause)?.label
}))
},
notificationTypes: f.notificationTypes
})
}),
tap((d) => console.log(d)),
shareReplay(1)
);
我有一个通过分页与之交互的表格。在创建这个组合的可观察对象之前,我过去只能更新通知:
retrieveNotificationsPage(queryArgs: Pageable = {}, searchData: any = {}) {
this._loading$.next(true);
const actionAuditLogsPage$ = this.apiService
.retrieveNotificationsView$(queryArgs, { ...searchData })
.pipe(
delay(300),
tap({
complete: () => {
this._loading$.next(false);
},
error: (err) => {
this._loading$.next(false);
throw err;
}
}),
catchError((err) => of({ error: err })),
// shareReplay()
);
return actionAuditLogsPage$;
}
然后被称为有点像这样......
loadRecords(event: LazyLoadEvent) {
...
this.notifications$ = this.retrieveNotificationsPage({
page: isNaN(selectedPage) ? 0 : selectedPage,
...(maxPageRowSize != null ? { size: maxPageRowSize } : {})
}, this.searchForm.value);
}
所以我想我可能可以通过合并来做到这一点,但它似乎只是陷入了无限循环......
this.combinedNotifications$ = merge(notificationPage$, this.combinedNotifications$);
您似乎想要手动更新
this.notifications$
,并确保 combinedNotifications$
反映这些更改,而不会影响 combinedNotifications$
由于 notifications$
和 notificationTypes$
的更改而更新时的原始行为。
要实现此目的,您可以使用
startWith
运算符发出 combinedNotifications$
的初始值,然后使用 merge
运算符将手动更新流与原始 combinedNotifications$
流合并。然后,您可以使用 distinctUntilChanged
运算符来确保仅发出不同的值,以避免无限循环。
import { merge, of } from 'rxjs';
import { startWith, distinctUntilChanged, switchMap } from 'rxjs/operators';
// Assume notificationPage$ is the observable returned by retrieveNotificationsPage
// Manual update trigger
const manualUpdate$ = new Subject<void>();
// Observable to handle manual updates
const manualUpdateNotifications$ = manualUpdate$.pipe(
switchMap(() => this.retrieveNotificationsPage())
);
// Combine manual updates with original notifications$
const updatedNotifications$ = merge(
manualUpdateNotifications$,
this.notifications$.pipe(
// Filter out initial emission
startWith(null),
// Distinct until changed to prevent infinite loops
distinctUntilChanged()
)
);
// Recreate combinedNotifications$ using the updated notifications$
this.combinedNotifications$ = combineLatest([
updatedNotifications$,
this.notificationTypes$
]).pipe(
map((i: any) => ({
tableData: i[0],
notificationTypes: i[1]
})),
map((f: any) => ({
tableData: {
...f.tableData,
content: f.tableData.content.map((v: any) => ({
...v,
type: f.notificationTypes.eventTypes.find((x: any) => x.value === v.type)?.label,
cause: f.notificationTypes.causeTypes.find((x: any) => x.value === v.cause)?.label
}))
},
notificationTypes: f.notificationTypes
})),
tap((d) => console.log(d)),
shareReplay(1)
);
// Function to trigger manual updates
triggerManualUpdate() {
manualUpdate$.next();
}
通过此设置,
this.combinedNotifications$
将反映自动更新的更改(由于 notifications$
和 notificationTypes$
的更改以及通过调用 triggerManualUpdate()
触发的手动更新)。每个更新都将是不同的,从而防止无限循环。