如何从 puppeteer 中 page.evaluate() 内部的 fetch() 获取流数据? (节点.js)

问题描述 投票:0回答:1

这是我的代码的简化版本:

app.post("/test", async(req, res) => {

    const browser = await puppeteer.launch({
            headless: true, /* I've tried with "new" and false too */
    });

    page = await browser.newPage();

    var desiredUrl = "url here";
    await page.goto(desiredUrl);

    /* Stream data from the page */
    await page.exposeFunction('writeData', (data) => {
            console.log("Writing data");
            res.write(data);
    });

    /* End stream */
    await page.exposeFunction('endStream', () => {
            console.log("End stream");
            res.end();
    });

    await page.evaluate(async ()=>{
        var output = await fetch("/endpoint_here", {
    "headers": {
            /* headers here */
               },
        });

        var reader = output.body.getReader();

        while (true) {
            var { done, value } = await reader.read();
            if (done) {
                window.endStream();
                return;
            }
            
            var decoder = new TextDecoder();
            var decodedData = decoder.decode(value);
            window.writeData(decodedData);
        }
    });

})

但是,这行不通。我尝试过的方法如下:

res 在 page.evaluate() 内部不起作用。我尝试过将 res 发送到 page.evaluate() 中,但它破坏了代码。

我尝试使用 page.exposeFunction() 并在那里执行 res.write (和 res.end() ),它有效,但只是第一次。第二次(以及此后的每次)我发送 post 请求时,代码运行正常(它在这些函数中执行了 console.logs ),但它没有执行 res.write() 和 res.end()完全没有。

我什至尝试使用公开的函数让它更新 page.evaluate() 内的全局变量,使用代理检测该变量的变化并执行 res.write() 来写入数据,但这之后也崩溃了第一个帖子请求。

解决这个奇怪问题的唯一方法是重新启动程序,这显然不是一个解决方案。

我还尝试将流数据记录到页面中的控制台,并使用 page.on('console') 将数据 res.write() 返回到客户端。这对于一次一个请求来说非常有效。但是,当对端点“/test”有多个同时请求时,它会将响应写入两个客户端,而不仅仅是发起请求的客户端。

唯一有效的就是在获取结束后返回响应,而不对其进行流式传输。不过,我希望它能被直播。

我被困住了,不知道该怎么办,因此我们将不胜感激。

node.js fetch puppeteer
1个回答
0
投票

假设您将

decodedData
传递到
window.writeData(decodedData);
,我无法重现。该问题似乎与您正在访问的端点和/或您的服务器配置有关。我建议分享该信息或尝试构建您自己的复制品。

这是我的复制尝试,希望对您有帮助。如果你运行你可以看到代码工作

$ node -v
v20.11.1
$ npm i
$ node sse-endpoint &
$ node server &
$ curl localhost:3001/stream
data: {"chunk":0}

data: {"chunk":1}

data: {"chunk":2}

data: {"chunk":3}

# ... and so on, streamed every second ...

package.json:

{
  "dependencies": {
    "express": "^4.19.2",
    "puppeteer": "^22.6.0"
  }
}

sse-endpoint.js(这是您正在拦截的远程 API 的模拟):

const express = require("express");
const app = express();

app.use((req, res, next) => {
  res.setHeader("Access-Control-Allow-Origin", "*");
  next();
});
app.use(express.static("public"));

app.get("/stream", (req, res) => {
  res.writeHead(200, {
    "Connection": "keep-alive",
    "Cache-Control": "no-cache",
    "Content-Type": "text/event-stream",
  });

  let counter = 0;
  const interval = setInterval(() => {
    const chunk = JSON.stringify({chunk: counter++});
    res.write(`data: ${chunk}\n\n`);
  }, 1000);

  res.on("close", () => {
    clearInterval(interval);
    res.end();
  });
});

const listener = app.listen(process.env.PORT || 3000, () =>
  console.log(`SSE endpoint is listening on port ${listener.address().port}`)
);

server.js(这是你的 API):

const express = require("express");
const puppeteer = require("puppeteer");
const app = express();

app.use(express.static("public"));
const browserReady = puppeteer.launch();

app.get("/stream", async (req, res) => {
  res.writeHead(200, {
    "Connection": "keep-alive",
    "Cache-Control": "no-cache",
    "Content-Type": "text/event-stream",
  });

  let page;
  try {
    page = await (await browserReady).newPage();
    await page.exposeFunction("writeData", data => {
      res.write(data);
    });
    await page.exposeFunction("endStream", () => {
      res.end();
    });
    await page.evaluate(async () => {
      const output = await fetch(
        "http://localhost:3000/stream"
      );
      const reader = output.body.getReader();

      while (true) {
        const {done, value} = await reader.read();

        if (done) {
          return window.endStream();
        }

        const decoder = new TextDecoder();
        const decodedData = decoder.decode(value);
        window.writeData(decodedData);
      }
    });
  } catch (err) {
    console.error(err);
    res.end();
  } finally {
    await page.close();
  }
});

const listener = app.listen(process.env.PORT || 3001, () =>
  console.log(
    `Proxy server is listening on port ${listener.address().port}`
  )
);

注意:此代码用于作为 POC 进行演示,并不一定演示最佳实践。

如果 SSE 端点由另一个页面进行流式传输,请确保您正确拦截或使用它。问题很可能就在那里,细节在那个阶段很重要。根据您想要实现的目标,可能有一种更简单的方法来实现,无论基本目标是什么——这也是完整上下文如此重要的另一个原因。

© www.soinside.com 2019 - 2024. All rights reserved.