在读取流和更新数据库时使用 sax 和 zlib - 奇怪的行为

问题描述 投票:0回答:1

我使用 sax 和 zlib 读取一个 gz 文件,然后更新我的数据库, 看来我跳过了一些文件内容。

这是我的代码:

// variables to demonstrate the weired behovior
let recordsNum = 1;
let recordsNum2 = 1;
let numOfFailed = 0;

async function boo(...args){
  const saxStream = sax.createStream(true); // strict mode
  let currentElement = null;
  let currentObject = null;

    saxStream.on("opentag", (node) => {
    if (node.name === "Item") {
      currentObject = {};
    } else {
      currentElement = keyTransforms[node.name] ;
    }
  });

  saxStream.on("closetag", async (nodeName) => {
    if (nodeName === "Item") {
      // process the current object
      recordsNum++;
      if (arrOfRecoreds.length <= 100 && currentObject) {
        arrOfRecoreds.push(currentObject);
        recordsNum2++;
      }

      if (!currentObject) {
        numOfFailed++;
      }

      if (arrOfRecoreds.length === 100) {
        await insertBatch(arrOfRecoreds);
        arrOfRecoreds = [];
      }

      // reset the current object
      currentObject = null;
    } else {
      currentElement = null;
    }
  });


}

 const readStream = fs
    .createReadStream("./temp/" + fileName)
    .pipe(zlib.createGunzip()) 
    .pipe(saxStream);

await new Promise((resolve, reject) => {
    readStream.on("end", async () => {
      if (arrOfRecoreds.length > 0) {
        await insertBatch(arrOfRecoreds);
        resolve();
      }
    });
    readStream.on("error", reject);
  });


  return new Promise((res) => {
    pool.end();
    console.log("Finished processing file.");

    res({
      statusCode: 201,
      message: "update db",
    });
  });

其中 insertBatch 函数只是根据记录更新数据库:

const client = await pool.connect();
  try {
    await client.query("BEGIN");
    await doSomthing(recoreds)
    await client.query("COMMIT");
    }

    catch(e){
    await client.query("ROLLBACK");
    }finally{
    client.release()
    }

代码没有错误,但它包含一些我无法理解的错误:

第一个错误: 调用嘘声时:

boo("str", "filename.rar").then(() => {
  console.log("num of recordes ", recordsNum);
  console.log("num of recordes2 ", recordsNum2);
  console.log("num of failed ", numOfFailed);
});

日志是: 记录数 100 记录数2 45 失败的次数 6

  1. 为什么 recordsNum 不等于 recordsNum2?
  2. 为什么会有一些失败的对象? XML 文件肯定有效!!!
  3. 每次运行都会更改 recordsNum2 和 failed 的数量

这些错误使这个脚本有问题,因为它没有更新我的 postgress db 中的所有记录

node.js zlib sax pg
1个回答
0
投票

你没有在你的代码中声明

arrOfRecoreds

您的

closetag
处理程序是异步的,但
sax
不会等待异步处理程序完成。这意味着当你发出数据库操作命令
insertBatch(arrOfRecoreds)
后,
sax
会继续解析。将发出另一个
closetag
事件,并将新条目推送到
arrOfRecoreds
before数据库操作完成并执行
arrOfRecoreds = []
。在那个时间点,数据库操作正在进行时被推入其中的所有内容都丢失了。

如果不等待数据库操作并在触发后立即重置

arrOfRecoreds
,则可以避免这种情况:

if (arrOfRecoreds.length === 100) {
  insertBatch(arrOfRecoreds);
  arrOfRecoreds = [];
}
© www.soinside.com 2019 - 2024. All rights reserved.