我有一个定期发送数据的服务器。我想在我的 REST API 中使用这些数据,如何获取它?当从外部源推送数据时需要自动调用它。我已经尝试过以下代码,但它不起作用。
var EventSource = require("eventsource");
var url = "..." // Source URL
var es = new EventSource(url);
es.onmessage = (event) => {
console.log(event)
const parsedData = JSON.parse(event.data);
console.log(parsedData)
}
当我想从不同的微服务使用 SSE 时,我遇到了同样的问题,我遵循了这种方法,它对我有用。
node.js 文件
const eventSource = require('eventsource');
async socEventStream(req, res) {
// list of the event you want to consume
const list = ['EVENT1_NAME', 'EVENT2_NAME','EVENT3_NAME'];
try {
const e = new eventSource('url//of_sse_event', {});
for (const l of list) {
e.addEventListener(l, (e) => {
const data = e.data;
// Your data
console.log('event data =====>',data)
});
};
res.on('close', () => {
for (const l of list) {
e.removeEventListener(l, (e) => {
});
}
})
} catch (err) {
console.log(err)
}
}
如果你想在node.js上消费事件并将其发送到客户端那么
const eventSource = require('eventsource');
async socEventStream(req, res) {
// setting express timeout for more 24 hrs
req.setTimeout(24 * 60 * 60 * 1000);
// setting headers for client to send consumed sse to client
const headers = {
'Content-Type': 'text/event-stream',
'Connection': 'keep-alive',
'Cache-Control': 'no-cache',
'Access-Control-Allow-Headers': 'Content-Type, Access-Control-Allow-Headers, Authorization, X-Requested-With,observe,x-access-key',
'Access-Control-Allow-Methods': 'POST, PUT, GET, OPTIONS, DELETE',
'Access-Control-Allow-Origin': '*',
};
res.setTimeout(24 * 60 * 60 * 1000);
res.writeHead(200, headers);
// list of the event you want to consume
const list = ['EVENT1_NAME', 'EVENT2_NAME','EVENT3_NAME'];
try {
const e = new eventSource('url//of_sse_event', {});
for (const l of list) {
e.addEventListener(l, (e) => {
const data = e.data;
// Your data
res.write(`event:${l}\ndata:${data}\n\n`);
});
};
req.on('close', () => {
for (const l of list) {
e.removeEventListener(l, (e) => {
});
}
})
} catch (err) {
console.log(err)
}
}
为了测试目的,在服务器上设置类似的东西。使用事件名称创建一个流,以便您可以在客户端上监听它。
const SseStream = require('ssestream')
app.get('/sse', (req, res) => {
console.log('new connection')
const sseStream = new SseStream(req)
sseStream.pipe(res)
const pusher = setInterval(() => {
sseStream.write({
event: 'server-time',
data: new Date().toTimeString()
})
}, 1000)
res.on('close', () => {
console.log('lost connection')
clearInterval(pusher)
sseStream.unpipe(res)
})
})
在客户端上您监听这样的事件
var EventSource = require('eventsource')
var es = new EventSource(url)
es.addEventListener('message', function (e) {
console.log(e.data)
})
可以尝试这个包@llm-eaf/node-event-source
// this.res is Express Response
await nodeEventSource(`https://api.openai.com/v1${url}`, {
method: method,
headers: {
Authorization: "************",
"Content-Type": "application/json",
},
data: this.req.body,
onOpen: () => {
this.res.type(EventStreamContentType);
},
onMessage: (ev) => {
this.res.write(`data: ${ev.data}\n\n`);
// do something with the message
},
});
this.res.end();