我正在使用 nestjs 创建一些 微服务。
例如,我有 x、y 和 z 服务全部由 grpc 互连,但我希望 service x 在特定实体更改时将更新发送到 webapp 所以我有 考虑服务器发送-事件[对任何其他更好的解决方案持开放态度].
按照nestjs 文档,他们有一个函数在 n 间隔运行 sse 路由,似乎资源耗尽。有没有一种方法可以在有更新时实际发送事件。
假设我在同一服务中有另一个 api 调用, 由另一个 web 应用程序上的按钮单击 触发,我如何触发事件仅在单击按钮时触发,而不是连续发送事件。另外,如果您知道任何惯用的方法来实现这一目标,那么我们将不胜感激,希望它是最后的手段。
我也考虑过MQTT发送事件。但我感觉单一服务不可能同时拥有MQTT和gRPC。我对使用 MQTT 持怀疑态度,因为它的延迟以及它将如何影响内部消息传递。如果我可以限制为外部客户端,那就太好了(即,使用 gRPC 进行内部连接的 x 服务和用于 webapp 的 MQTT 只需要一个由 mqtt 公开的路由)。 (PS 我是微服务的新手 所以请全面了解您的解决方案 :p)
可以。重要的是,在 NestJS 中,
SSE
是用 Observables 实现的,所以只要你有一个可以添加的 observable,就可以用它来发回 SSE 事件。最简单的方法是使用 Subject
s。我曾经在某个地方有过这样的例子,但一般来说,它看起来像这样
@Controller()
export class SseController {
constructor(private readonly sseService: SseService) {}
@SSE()
doTheSse() {
return this.sseService.sendEvents();
}
}
@Injectable()
export class SseService {
private events = new Subject();
addEvent(event) {
this.events.next(event);
}
sendEvents() {
return this.events.asObservable();
}
}
@Injectable()
export class ButtonTriggeredService {
constructor(private readonly sseService: SseService) {}
buttonClickedOrSomething() {
this.sseService.addEvent(buttonClickedEvent);
}
}
请原谅上面的伪代码性质,但总的来说它确实展示了如何使用 Subjects 为 SSE 事件创建可观察对象。只要
@SSE()
端点返回一个具有正确形状的可观察对象,你就是金色的。
NestJS 的 SSE 有更好的处理事件的方式:
请查看带有代码示例的回购协议:
https://github.com/ningacoding/nest-sse-bug/tree/main/src
基本上你有服务的地方:
import {Injectable} from '@nestjs/common';
import {fromEvent} from "rxjs";
import {EventEmitter} from "events";
@Injectable()
export class EventsService {
private readonly emitter = new EventEmitter();
subscribe(channel: string) {
return fromEvent(this.emitter, channel);
}
emit(channel: string, data?: object) {
this.emitter.emit(channel, {data});
}
}
很明显,channel可以是任意字符串,推荐使用路径样式。
例如:“events/for/
与@UseGuards等完全兼容:)
附加说明:由于已知错误,请勿在 EventsService 中注入任何服务。
是的,这是可能的,我们可以使用事件发射器代替间隔。 每当事件发出时,我们都可以将响应发送回客户端。
notification.controller.ts
import { Public } from 'src/decorators';
import { Observable } from 'rxjs';
import { FastifyReply } from 'fastify';
import { NotificationService } from './notification.service';
import { Sse, Controller, Res } from '@nestjs/common';
@Public()
@Controller()
export class NotificationController {
constructor(private notificationService: NotificationService) {}
@Sse('notifications')
async sendNotification(@Res() reply: FastifyReply): Promise<Observable<any>> {
return await this.notificationService.handleConnection();
}
}
notification.service.ts
import { Injectable } from '@nestjs/common';
import { Subject } from 'rxjs';
@Injectable()
export class NotificationService {
notificationEvent: Subject<any> = new Subject();
async handleConnection() {
setInterval(() => {
this.notificationEvent.next({ data: { message: 'Hello World' } });
}, 1000);
return this.notificationEvent.asObservable();
}
}
@Sse('sse-endpoint')
sse(): Observable<any> {
//data have to strem
const arr = ['d1','d2', 'd3'];
return new Observable((subscriber) => {
while(arr.len){
subscriber.next(arr.pop()); // data have to return in every chunk
}
if(arr.len == 0) subscriber.complete(); // complete the subscription
});
}