这是我的代码的简化版本:
app.post("/test", async(req, res) => {
const browser = await puppeteer.launch({
headless: true, /* I've tried with "new" and false too */
});
page = await browser.newPage();
var desiredUrl = "url here";
await page.goto(desiredUrl);
/* Stream data from the page */
await page.exposeFunction('writeData', (data) => {
console.log("Writing data");
res.write(data);
});
/* End stream */
await page.exposeFunction('endStream', () => {
console.log("End stream");
res.end();
});
await page.evaluate(async ()=>{
var output = await fetch("/endpoint_here", {
"headers": {
/* headers here */
},
});
var reader = output.body.getReader();
while (true) {
var { done, value } = await reader.read();
if (done) {
window.endStream();
return;
}
var decoder = new TextDecoder();
var decodedData = decoder.decode(value);
window.writeData(decodedData);
}
});
})
但是,这行不通。我尝试过的方法如下:
res 在 page.evaluate() 内部不起作用。我尝试过将 res 发送到 page.evaluate() 中,但它破坏了代码。
我尝试使用 page.exposeFunction() 并在那里执行 res.write (和 res.end() ),它有效,但只是第一次。第二次(以及此后的每次)我发送 post 请求时,代码运行正常(它在这些函数中执行了 console.logs ),但它没有执行 res.write() 和 res.end()完全没有。
我什至尝试使用公开的函数让它更新 page.evaluate() 内的全局变量,使用代理检测该变量的变化并执行 res.write() 来写入数据,但这之后也崩溃了第一个帖子请求。
解决这个奇怪问题的唯一方法是重新启动程序,这显然不是一个解决方案。
我还尝试将流数据记录到页面中的控制台,并使用 page.on('console') 将数据 res.write() 返回到客户端。这对于一次一个请求来说非常有效。但是,当对端点“/test”有多个同时请求时,它会将响应写入两个客户端,而不仅仅是发起请求的客户端。
唯一有效的就是在获取结束后返回响应,而不对其进行流式传输。不过,我希望它能被直播。
我被困住了,不知道该怎么办,因此我们将不胜感激。
假设您将
decodedData
传递到 window.writeData(decodedData);
,我无法重现。该问题似乎与您正在访问的端点和/或您的服务器配置有关。我建议分享该信息或尝试构建您自己的复制品。
这是我的复制尝试,希望对您有帮助。如果你运行你可以看到代码工作
$ node -v
v20.11.1
$ npm i
$ node sse-endpoint &
$ node server &
$ curl localhost:3001/stream
data: {"chunk":0}
data: {"chunk":1}
data: {"chunk":2}
data: {"chunk":3}
# ... and so on, streamed every second ...
package.json:
{
"dependencies": {
"express": "^4.19.2",
"puppeteer": "^22.6.0"
}
}
sse-endpoint.js(这是您正在拦截的远程 API 的模拟):
const express = require("express");
const app = express();
app.use((req, res, next) => {
res.setHeader("Access-Control-Allow-Origin", "*");
next();
});
app.use(express.static("public"));
app.get("/stream", (req, res) => {
res.writeHead(200, {
"Connection": "keep-alive",
"Cache-Control": "no-cache",
"Content-Type": "text/event-stream",
});
let counter = 0;
const interval = setInterval(() => {
const chunk = JSON.stringify({chunk: counter++});
res.write(`data: ${chunk}\n\n`);
}, 1000);
res.on("close", () => {
clearInterval(interval);
res.end();
});
});
const listener = app.listen(process.env.PORT || 3000, () =>
console.log(`SSE endpoint is listening on port ${listener.address().port}`)
);
server.js(这是你的 API):
const express = require("express");
const puppeteer = require("puppeteer");
const app = express();
app.use(express.static("public"));
const browserReady = puppeteer.launch();
app.get("/stream", async (req, res) => {
res.writeHead(200, {
"Connection": "keep-alive",
"Cache-Control": "no-cache",
"Content-Type": "text/event-stream",
});
let page;
try {
page = await (await browserReady).newPage();
await page.exposeFunction("writeData", data => {
res.write(data);
});
await page.exposeFunction("endStream", () => {
res.end();
});
await page.evaluate(async () => {
const output = await fetch(
"http://localhost:3000/stream"
);
const reader = output.body.getReader();
while (true) {
const {done, value} = await reader.read();
if (done) {
return window.endStream();
}
const decoder = new TextDecoder();
const decodedData = decoder.decode(value);
window.writeData(decodedData);
}
});
} catch (err) {
console.error(err);
res.end();
} finally {
await page.close();
}
});
const listener = app.listen(process.env.PORT || 3001, () =>
console.log(
`Proxy server is listening on port ${listener.address().port}`
)
);
注意:此代码用于作为 POC 进行演示,并不一定演示最佳实践。
如果 SSE 端点由另一个页面进行流式传输,请确保您正确拦截或使用它。问题很可能就在那里,细节在那个阶段很重要。根据您想要实现的目标,可能有一种更简单的方法来实现,无论基本目标是什么——这也是完整上下文如此重要的另一个原因。