我使用 sax 和 zlib 读取一个 gz 文件,然后更新我的数据库, 看来我跳过了一些文件内容。
这是我的代码:
// variables to demonstrate the weired behovior
let recordsNum = 1;
let recordsNum2 = 1;
let numOfFailed = 0;
async function boo(...args){
const saxStream = sax.createStream(true); // strict mode
let currentElement = null;
let currentObject = null;
saxStream.on("opentag", (node) => {
if (node.name === "Item") {
currentObject = {};
} else {
currentElement = keyTransforms[node.name] ;
}
});
saxStream.on("closetag", async (nodeName) => {
if (nodeName === "Item") {
// process the current object
recordsNum++;
if (arrOfRecoreds.length <= 100 && currentObject) {
arrOfRecoreds.push(currentObject);
recordsNum2++;
}
if (!currentObject) {
numOfFailed++;
}
if (arrOfRecoreds.length === 100) {
await insertBatch(arrOfRecoreds);
arrOfRecoreds = [];
}
// reset the current object
currentObject = null;
} else {
currentElement = null;
}
});
}
const readStream = fs
.createReadStream("./temp/" + fileName)
.pipe(zlib.createGunzip())
.pipe(saxStream);
await new Promise((resolve, reject) => {
readStream.on("end", async () => {
if (arrOfRecoreds.length > 0) {
await insertBatch(arrOfRecoreds);
resolve();
}
});
readStream.on("error", reject);
});
return new Promise((res) => {
pool.end();
console.log("Finished processing file.");
res({
statusCode: 201,
message: "update db",
});
});
其中 insertBatch 函数只是根据记录更新数据库:
const client = await pool.connect();
try {
await client.query("BEGIN");
await doSomthing(recoreds)
await client.query("COMMIT");
}
catch(e){
await client.query("ROLLBACK");
}finally{
client.release()
}
代码没有错误,但它包含一些我无法理解的错误:
第一个错误: 调用嘘声时:
boo("str", "filename.rar").then(() => {
console.log("num of recordes ", recordsNum);
console.log("num of recordes2 ", recordsNum2);
console.log("num of failed ", numOfFailed);
});
日志是: 记录数 100 记录数2 45 失败的次数 6
这些错误使这个脚本有问题,因为它没有更新我的 postgress db 中的所有记录
你没有在你的代码中声明
arrOfRecoreds
。
您的
closetag
处理程序是异步的,但 sax
不会等待异步处理程序完成。这意味着当你发出数据库操作命令insertBatch(arrOfRecoreds)
后,sax
会继续解析。将发出另一个closetag
事件,并将新条目推送到arrOfRecoreds
before数据库操作完成并执行arrOfRecoreds = []
。在那个时间点,数据库操作正在进行时被推入其中的所有内容都丢失了。
如果不等待数据库操作并在触发后立即重置
arrOfRecoreds
,则可以避免这种情况:
if (arrOfRecoreds.length === 100) {
insertBatch(arrOfRecoreds);
arrOfRecoreds = [];
}