我必须在 NodeJS 中有效地处理大型文本文件,但似乎无法比下面的代码进一步改进。
输入文件是约 50 GB 的文本文件,处理逻辑是计算某种类型的总和并生成聚合数据的输出文件。
起初,我试图一次读取整个文件,但达到了堆空间限制,因此我将读取部分分成 1,000,000 行的块,但这已经是我能达到的最快速度,即 1 小时 20 分钟读取 300,000,000 行。我尝试了一种基于承诺的方法,其中块的聚合与下一个块读取并行发生,也考虑了工作线程,但这对于将大数据传递到线程来说并不理想。
我需要帮助审查和改进此代码,我不确定聚合是否与下一个读取阶段并行运行?
我正在使用 line-reader 便利库。
async function parseFileFast(fileName_) {
var end, start = Date.now();
///... basic checks omitted
var chunkSize = 3000000;
var chunkIndex = 0;
var chunkCounter = 0;
var lineCounter = 0;
var chunkData = [];
console.log("processing .. " + fileName_);
var result = [];
// return new Promise((ok, ko) => {
lineReader.eachLine(fileName_, async function(line) {
chunkData.push(line);
chunkCounter++;
lineCounter++;
if (chunkCounter >= chunkSize) {
console.log("reaching boundary .. " + chunkIndex);
console.log("processed lines so far .. " + lineCounter);
aggregate(chunkData, chunkIndex).then((data) => {
result.push(...data.frames);
});
chunkData = [];
chunkCounter = 0;
chunkIndex++;
}
}, async function(err) {
if (err) throw err;
var status = writeOutput2(result, fileName_.replace(".csv", "3.csv"));
end = Date.now();
console.log("finished processing everything in sec: " + ((end - start) / 1000));
return status;
});
}
function aggregate(dataSet, index){
return new Promise((resolve, reject)=>{
var frames = new Map();
dataSet.forEach((line) => {
frames = processFrames(line, frames);
});
var result = { frames: frames, index: index};
resolve(result);
})
}
我的猜测是聚合正在阻塞主线程,事件队列正在等待,直到 CPU 上完成此处理,以便通知下一个读取块,尝试将聚合放在有限的工作线程池后面,然后进行测量。