我们可以在 nestjs 中使用服务器发送的事件而不使用间隔吗?

问题描述 投票:0回答:4

我正在使用 nestjs 创建一些 微服务

例如,我有 xyz 服务全部由 grpc 互连,但我希望 service x 在特定实体更改时将更新发送到 webapp 所以我有 考虑服务器发送-事件[对任何其他更好的解决方案持开放态度].

按照nestjs 文档,他们有一个函数在 n 间隔运行 sse 路由,似乎资源耗尽。有没有一种方法可以在有更新时实际发送事件。

假设我在同一服务中有另一个 api 调用, 由另一个 web 应用程序上的按钮单击 触发,我如何触发事件仅在单击按钮时触发,而不是连续发送事件。另外,如果您知道任何惯用的方法来实现这一目标,那么我们将不胜感激,希望它是最后的手段。

[奖金问题]

我也考虑过MQTT发送事件。但我感觉单一服务不可能同时拥有MQTT和gRPC。我对使用 MQTT 持怀疑态度,因为它的延迟以及它将如何影响内部消息传递。如果我可以限制为外部客户端,那就太好了(即,使用 gRPC 进行内部连接的 x 服务和用于 webapp 的 MQTT 只需要一个由 mqtt 公开的路由)。 (PS 我是微服务的新手 所以请全面了解您的解决方案 :p)

提前感谢阅读到最后!

microservices mqtt nestjs server-sent-events nestjs-gateways
4个回答
14
投票

可以。重要的是,在 NestJS 中,

SSE
是用 Observables 实现的,所以只要你有一个可以添加的 observable,就可以用它来发回 SSE 事件。最简单的方法是使用
Subject
s
。我曾经在某个地方有过这样的例子,但一般来说,它看起来像这样

@Controller()
export class SseController {
  constructor(private readonly sseService: SseService) {}

  @SSE()
  doTheSse() {
    return this.sseService.sendEvents();
  }
}
@Injectable()
export class SseService {
  private events = new Subject();

  addEvent(event) {
    this.events.next(event);
  }

  sendEvents() {
    return this.events.asObservable();
  }
}
@Injectable()
export class ButtonTriggeredService {
  constructor(private readonly sseService: SseService) {}

  buttonClickedOrSomething() {
    this.sseService.addEvent(buttonClickedEvent);
  }
}

请原谅上面的伪代码性质,但总的来说它确实展示了如何使用 Subjects 为 SSE 事件创建可观察对象。只要

@SSE()
端点返回一个具有正确形状的可观察对象,你就是金色的。


13
投票

NestJS 的 SSE 有更好的处理事件的方式:

请查看带有代码示例的回购协议:

https://github.com/ningacoding/nest-sse-bug/tree/main/src

基本上你有服务的地方:

import {Injectable} from '@nestjs/common';
import {fromEvent} from "rxjs";
import {EventEmitter} from "events";

@Injectable()
export class EventsService {

    private readonly emitter = new EventEmitter();

    subscribe(channel: string) {
        return fromEvent(this.emitter, channel);
    }

    emit(channel: string, data?: object) {
        this.emitter.emit(channel, {data});
    }

}

很明显,channel可以是任意字符串,推荐使用路径样式。

例如:“events/for/”和订阅该频道的用户将只收到该频道的事件,并且只有在被解雇时才会收到;)-

与@UseGuards等完全兼容:)

附加说明:由于已知错误,请勿在 EventsService 中注入任何服务。


0
投票

是的,这是可能的,我们可以使用事件发射器代替间隔。 每当事件发出时,我们都可以将响应发送回客户端。

notification.controller.ts

import { Public } from 'src/decorators';
import { Observable } from 'rxjs';
import { FastifyReply } from 'fastify';
import { NotificationService } from './notification.service';

import { Sse, Controller, Res } from '@nestjs/common';

@Public()
@Controller()
export class NotificationController {
  constructor(private notificationService: NotificationService) {}
  @Sse('notifications')
  async sendNotification(@Res() reply: FastifyReply): Promise<Observable<any>> {
    return await this.notificationService.handleConnection();
  }
}

notification.service.ts

import { Injectable } from '@nestjs/common';
import { Subject } from 'rxjs';

@Injectable()
export class NotificationService {
  notificationEvent: Subject<any> = new Subject();
  async handleConnection() {
    setInterval(() => {
      this.notificationEvent.next({ data: { message: 'Hello World' } });
    }, 1000);
    return this.notificationEvent.asObservable();
  }
}
  • 这里我使用了 setInterval,只是为了发出一个新事件。您可以在代码中的任何地方发出新事件,而无需使用 setInterval 方法。
  • 我发现这种方法有一些问题,我分享了下面的链接,也检查下面的链接。 sse in nestJs

-1
投票
  @Sse('sse-endpoint')
  sse(): Observable<any> {
    //data have to strem
    const arr = ['d1','d2', 'd3']; 
    return new Observable((subscriber) => {
        while(arr.len){
            subscriber.next(arr.pop()); // data have to return in every chunk
        }
        if(arr.len == 0) subscriber.complete(); // complete the subscription
    });
  }
© www.soinside.com 2019 - 2024. All rights reserved.