如何在Nodejs中消费来自sse服务器的数据?

问题描述 投票:0回答:3

我有一个定期发送数据的服务器。我想在我的 REST API 中使用这些数据,如何获取它?当从外部源推送数据时需要自动调用它。我已经尝试过以下代码,但它不起作用。

var EventSource = require("eventsource");

var url = "..." // Source URL
    var es =  new EventSource(url);
    es.onmessage = (event) => {
        console.log(event)
        const parsedData = JSON.parse(event.data);
        console.log(parsedData)
    }
javascript node.js express sockets server-sent-events
3个回答
3
投票

当我想从不同的微服务使用 SSE 时,我遇到了同样的问题,我遵循了这种方法,它对我有用。

node.js 文件

const  eventSource = require('eventsource');

  async socEventStream(req, res) {

    // list of the event you want to consume

    const list = ['EVENT1_NAME', 'EVENT2_NAME','EVENT3_NAME'];
    try {
      const e = new eventSource('url//of_sse_event', {});
      for (const l of list) {
        e.addEventListener(l, (e) => {
          const data = e.data;

         // Your data
          console.log('event data =====>',data)
          
        });
      };
      res.on('close', () => {
        for (const l of list) {
          e.removeEventListener(l, (e) => {
          });
        }
      })
    } catch (err) {
      console.log(err)
    }
  }

如果你想在node.js上消费事件并将其发送到客户端那么


const  eventSource = require('eventsource');

  async socEventStream(req, res) {
   // setting express timeout for more 24 hrs
   req.setTimeout(24 * 60 * 60 * 1000);

    // setting headers for client to send consumed sse to client

    const headers = {
      'Content-Type': 'text/event-stream',
      'Connection': 'keep-alive',
      'Cache-Control': 'no-cache',
      'Access-Control-Allow-Headers': 'Content-Type, Access-Control-Allow-Headers, Authorization, X-Requested-With,observe,x-access-key',
      'Access-Control-Allow-Methods': 'POST, PUT, GET, OPTIONS, DELETE',
      'Access-Control-Allow-Origin': '*',
    };
    res.setTimeout(24 * 60 * 60 * 1000);
    res.writeHead(200, headers);

    // list of the event you want to consume

    const list = ['EVENT1_NAME', 'EVENT2_NAME','EVENT3_NAME'];
    try {
      const e = new eventSource('url//of_sse_event', {});
      for (const l of list) {
        e.addEventListener(l, (e) => {
          const data = e.data;

         // Your data
          res.write(`event:${l}\ndata:${data}\n\n`);
          
        });
      };
      req.on('close', () => {
        for (const l of list) {
          e.removeEventListener(l, (e) => {
          });
        }
      })
    } catch (err) {
      console.log(err)
    }
  }


1
投票

为了测试目的,在服务器上设置类似的东西。使用事件名称创建一个流,以便您可以在客户端上监听它。

const SseStream = require('ssestream')

app.get('/sse', (req, res) => {
  console.log('new connection')

  const sseStream = new SseStream(req)
  sseStream.pipe(res)
  const pusher = setInterval(() => {
    sseStream.write({
      event: 'server-time',
      data: new Date().toTimeString()
    })
  }, 1000)

  res.on('close', () => {
    console.log('lost connection')
    clearInterval(pusher)
    sseStream.unpipe(res)
  })
})

在客户端上您监听这样的事件

var EventSource = require('eventsource')
var es = new EventSource(url)
es.addEventListener('message', function (e) {
  console.log(e.data)
})

0
投票

可以尝试这个包@llm-eaf/node-event-source

// this.res is Express Response
await nodeEventSource(`https://api.openai.com/v1${url}`, {
  method: method,
  headers: {
    Authorization: "************",
    "Content-Type": "application/json",
  },
  data: this.req.body,
  onOpen: () => {
    this.res.type(EventStreamContentType);
  },
  onMessage: (ev) => {
    this.res.write(`data: ${ev.data}\n\n`);
    // do something with the message
  },
});
this.res.end();
© www.soinside.com 2019 - 2024. All rights reserved.