哟!我在我的 nestjs 应用程序中编写了一个 SSE 服务,它看起来像这样(不要打我,我是第一次这样做):
@Injectable({ scope: Scope.TRANSIENT })
export class SseService {
private readonly _observables = new Map<string, Subject<string>>();
getObservablesByIds(ids: string[]): Observable<string> {
const observables: Observable<string>[] = []
let observable;
for (const id of ids) {
observable = this._observables.get(id)
if (!observable) {
observable = new Subject<string>()
this._observables.set(id, observable);
}
observables.push(observable);
}
return merge(...observables).pipe();
}
emit(id: string, action: string, data: unknown): void {
const observable = this._observables.get(id);
if (observable) {
observable.next(JSON.stringify({ [action]: data }))
}
}
}
它是这样工作的:应用程序中有许多实体,我会观察到这些实体的更新。我为每个实体创建一个 rxjs Subject 并将其映射到“_observables”中。
假设这是一个博客,我会在更新帖子时发送 SSE。
有什么方法:
GET '/stream'
调用这个mwethod。;问题: 假设我创建了一个帖子:
const res = this.postsService.create()
this.sseService.emit(res.id, 'created', res)
现在这个“发射”无处可去——没有人订阅这个新的实体事件。这实际上没关系 - 你不能订阅不存在的东西。
我(作为创作者)再次点击
GET /stream
,现在传递一个新实体的 id 以获取更新。
但是其他正在看博客的人不会订阅——他们不知道有新帖子被创建,他们也没有 id 可以订阅。
问题的第二部分是,我可能需要过滤 SSE 事件,以防某些用户受到限制(比如他们看不到私人帖子的更新)。我实际上不知道该怎么做。