我正在尝试从流中读取并同步处理数据。
问题在于,处理逻辑是异步方法(返回诺言)。这是一个示例:
stream.on("data", async (data) => {
await db.collection("mydb").insertMany(data)
}).on("end", () => {
console.log("finished")
})
如果运行此命令,我认为它将迭代流并向数据库发出许多并发的insertMany
请求,因为它不等待insertMany
完成,这将使数据库超载。我想确保一次仅处理一次此功能。
如何执行?
首先,除非您的流处于对象模式,否则无法保证在给定data
事件中到达的内容恰好是您要插入数据库中的内容。因此,您可能必须解析流以为每个插入物收集正确的数据集。
然后,如果您一次只想插入一个,则有两种策略:
您可以在调用插入之前暂停流,然后在插入完成后可以恢复流。
您可以允许流继续读取并触发data
事件,并将数据排队到某种队列中,然后从队列中一次插入它们。您可以使用物理队列或链式承诺
这是暂停选项的外观:
stream.on("data", async (data) => {
// sort out data into whole chunks, a stream (unless it's in "object mode")
// can give you data events for any arbitrary chunk of data, not just the
// chunks you may want to insert into your database
stream.pause();
try {
await db.collection("mydb").insertMany(data);
stream.resume();
} catch(e) {
// decide what you're doing here if there's an error inserting
// stream will be paused unless you resume it here
}
}).on("end", () => {
console.log("finished")
})