我需要使用 rxjs 运算符找到解决方案。我正在使用 ngrx 开发一个角度应用程序,我想等待 ngrx 选择器填充。
我有一个通知效果,我调用一个 sse 服务,我需要等待管道,直到可观察的“Firm$”返回一个非空数组。我该如何解决这个问题?
loadEVents$ = createEffect(() =>
this.actions$.pipe(
ofType('[WebSocketNotification] Load WebSocketNotification'),
mergeMap(() =>
this.webSocketEventsService.getServerSentEvent(`${environment.SERVER_API_URL}/services/synbee-api-events/api/notification`).pipe(
filter((notif: Notification) => notif.notificationType !== 'OK' && notif.seen === false),
map(res => (this.notification = res)),
switchMap(() => this.Firm$),
WAITUNTIL(res => res.length > 0),
map(()=> {
if (this.notification.notificationType === 'SHARING') {
this.Firm$.pipe(take(1)).subscribe(firms => {
if (firms.find(firm => firm.id === this.notification.idFirm) === undefined) {
this.serviceFirm.getFirms();
}
});
}
return { type: notificationActions.ActionNotificationType.LOAD_NOTIFICATION_SUCCESS, notification: this.notification };
}),
catchError(() => {
return EMPTY;
})
)
)
)
);
我尝试使用重复和过滤运算符,但它中断了所有通量。
在您的代码中
WAITUNTIL
可以替换为简单的 filter
。我不确定你所说的“它中断了所有的通量”是什么意思,因为你经过的下游没有任何东西。您尝试使用 map
来分配不必要的通知。 tap
通常是您想要用于副作用的东西,但您稍后需要它。放下那行并执行此操作
switchMap(notification => this.Firm$.pipe(firm =>
filter(firm => firm.length > 0),
map(firm => ({firm, notification}))
)),
请注意,您的
map
现在将收到这些确认和通知,因此您可以撤消额外的 take(1).subscribe
逻辑并将全局变量删除为类似
map(({firm, notification}) => {
if (notification.notificationType === 'SHARING') {
if (firms.find(firm => firm.id === notification.idFirm) === undefined) {
this.serviceFirm.getFirms();
}
}
return { type: notificationActions.ActionNotificationType.LOAD_NOTIFICATION_SUCCESS, notification: this.notification };
}),