当处理逻辑异步时如何同步处理node.js流?

问题描述 投票:0回答:1

我正在尝试从流中读取并同步处理数据。

问题在于,处理逻辑是异步方法(返回诺言)。这是一个示例:

stream.on("data", async (data) => {
  await db.collection("mydb").insertMany(data)
}).on("end", () => {
  console.log("finished")
})

如果运行此命令,我认为它将迭代流并向数据库发出许多并发的insertMany请求,因为它不等待insertMany完成,这将使数据库超载。我想确保一次仅处理一次此功能。

如何执行?

javascript node.js
1个回答
0
投票

首先,除非您的流处于对象模式,否则无法保证在给定data事件中到达的内容恰好是您要插入数据库中的内容。因此,您可能必须解析流以为每个插入物收集正确的数据集。

然后,如果您一次只想插入一个,则有两种策略:

  1. 您可以在调用插入之前暂停流,然后在插入完成后可以恢复流。

  2. 您可以允许流继续读取并触发data事件,并将数据排队到某种队列中,然后从队列中一次插入它们。您可以使用物理队列或链式承诺

这是暂停选项的外观:

stream.on("data", async (data) => {
  // sort out data into whole chunks, a stream (unless it's in "object mode")
  //    can give you data events for any arbitrary chunk of data, not just the
  //    chunks you may want to insert into your database
  stream.pause();
  try {
      await db.collection("mydb").insertMany(data);
      stream.resume();
  } catch(e) {
      // decide what you're doing here if there's an error inserting
      // stream will be paused unless you resume it here
  }
}).on("end", () => {
  console.log("finished")
})
© www.soinside.com 2019 - 2024. All rights reserved.