nodejs 在 createReadStream 中异步等待

问题描述 投票:0回答:3

我正在逐行读取 CSV 文件并在 MongoDB 中插入/更新。预期输出将是 1.console.log(行); 2. console.log(光标); 3.console.log("流");

但是得到像这样的输出 1.console.log(行); 控制台.log(行);控制台.log(行);控制台.log(行);控制台.log(行); …………………… 2. console.log(光标); 3.console.log("流"); 请让我知道我在这里缺少什么。

const csv = require('csv-parser');
const fs = require('fs');

var mongodb = require("mongodb");

var client = mongodb.MongoClient;
var url = "mongodb://localhost:27017/";
var collection;
client.connect(url,{ useUnifiedTopology: true }, function (err, client) {

  var db = client.db("UKCompanies");
  collection = db.collection("company");
  startRead();
});
var cursor={};

async function insertRec(row){
  console.log(row);
  cursor = await collection.update({CompanyNumber:23}, row, {upsert: true});
  if(cursor){
    console.log(cursor);
  }else{
    console.log('not exist')
  }
  console.log("stream");
}



async function startRead() {
  fs.createReadStream('./data/inside/6.csv')
    .pipe(csv())
    .on('data', async (row) => {
      await insertRec(row);
    })
    .on('end', () => {
      console.log('CSV file successfully processed');
    });
}
node.js async-await nodejs-stream
3个回答
23
投票

在您的

startRead()
函数中,当
await insertRec()
正在处理时,
data
不会阻止更多
insertRec()
事件流动。因此,如果您不希望下一个
data
事件运行直到
insertRec()
完成,则需要暂停,然后恢复流。

async function startRead() {
  const stream = fs.createReadStream('./data/inside/6.csv')
    .pipe(csv())
    .on('data', async (row) => {
      try {
        stream.pause();
        await insertRec(row);
      } finally {
        stream.resume();
      }
    })
    .on('end', () => {
      console.log('CSV file successfully processed');
    });
}

仅供参考,如果

insertRec()
失败,您还需要一些错误处理。


5
投票

Node 10+ ReadableStream 获得属性 Symbol.asyncIterator 并且它允许使用 for-await-of

处理流
async function startRead() {
    const readStream = fs.createReadStream('./data/inside/6.csv');    
    
    for await (const row of readStream.pipe(csv())) {
        await insertRec(row);
    }

    console.log('CSV file successfully processed');
}

2
投票

在这种情况下这是预期的行为,因为当数据在流中可用时,您的

on
数据侦听器会异步触发
insertRec
。这就是为什么你的插入方法的第一行是并行执行的。如果您想控制此行为,可以在创建读取流时使用
highWaterMark
(https://nodejs.org/api/stream.html#stream_read_readhighwatermark) 属性。这样您将一次获得 1 条记录,但我不确定您的用例是什么。

类似这样的事情

fs.createReadStream(`somefile.csv`, {
  "highWaterMark": 1
})

您也没有在等待您的

startRead
方法。我会将其包装在 Promise 中并在
end
侦听器中解决它,否则您将不知道处理何时完成。类似的东西

function startRead() {
  return new Promise((resolve, reject) => {
    fs.createReadStream(`somepath`)
      .pipe(csv())
      .on("data", async row => {
        await insertRec(row);
      })
      .on("error", err => {
        reject(err);
      })
      .on("end", () => {
        console.log("CSV file successfully processed");
        resolve();
      });
  });

}
© www.soinside.com 2019 - 2024. All rights reserved.