createReadStream 在解析之前返回

问题描述 投票:0回答:0

我正在编写一个程序来迭代 Linux 中提供的目录中的文件。
我使用了我的函数“myReadFile”,它返回新的承诺,并且只有在调用“end”事件 CB 时它才应该解析。
我对目录中的每个文件使用 myReadFile 并使用“Promise.allSettled”进行调用,但我可以看到它最终然后在调用“end”事件 CB 之前执行。

var fs = require('fs'); // for files and direcroeis read/write
var es = require('event-stream'); // for handling async chuncks from the stream
var now = require('performance-now'); // for timer for each file
var path = require('path') // to check file extension


var hashMap = new Map();
var t0 = now();
var t1;

const checkFilePath = (file) => {
    try {
        stats = fs.statSync(file);
        if (!stats.isFile()) {
            console.log("\033[31mError\033[0m: Specified path is not a File");
            return false;
        }
    } catch (e) {
        console.log("\033[31mError\033[0m: Invalid File Path");
        return false;
    }
    return true;
}


const checkDirectoryPath = (file) => {
    try {
        stats = fs.lstatSync(file);
        if (!stats.isDirectory()) {
            console.log("\033[31mError\033[0m: Specified path is not a directory");
            return false;
        }
    } catch (e) {
        console.log("\033[31mError\033[0m: Invalid directory Path");
        return false;
    }
    return true;
}

const IsTextFile = (file) => {
    try {
        return path.extname(file) == ".txt";
    } catch (e) {
        console.log("\033[31mError\033[0m: Invalid file path");
        return false;
    }
}


const checkFormat = () => {
    if (process.argv.length != 3) {
        console.log("\033[31mError\033[0m: Usage: node app.js [path to directory]");
        return false;
    }
    return true;
}

const myReadFile = async (fileName) => {
    return new Promise((resolve, reject) => {
        console.log("processing file name: ", fileName);
        const stream = fs.createReadStream(fileName).pipe(es.split()).pipe(
            es.mapSync((line) => {
                line.match(/\p{L}+/gu)?.map((word) => {
                    if (hashMap.has(word)) {
                        hashMap.set(word, hashMap.get(word) + 1);
                    } else {
                        hashMap.set(word, 1);
                    }
                })
            })
                
        )
        .on('error', (err) => {
            console.log('\033[31mError\033[0m: while reading file.', fileName, err);
            reject();
        })
        .on('end', () => {
            t1 = now();
            console.log(`Done processing file ` + fileName + ` in ` + (t1 - t0).toFixed(3) + `ms`);
            resolve(1);

        })
  
    });
}


 const main = async() => {
    const myPromises = []

    if (!checkFormat() || !checkDirectoryPath(process.argv[2])) {
        process.exit();
    }
    const directoryPath = process.argv[2];

    fs.readdir(directoryPath, function (err, files) {
        //handling error
        if (err) {
            return console.log('Unable to scan directory: ' + err);
        }
        //listing all files using forEach
        files.forEach(function (file) {
            // Do whatever you want to do with the file
            if (IsTextFile(file)) {
                //console.log("proccesing file name: ", file);
                myPromises.push(myReadFile(file))
            }
        });
    });

    await Promise.allSettled(myPromises).then((values) => {
        console.log("allSettled2 values: ", values);
    }).finally(() => {
        console.log("done");
    });


}

main();

对于上面的代码,我可以看到以下输出:

 
$ node app.js textFilesDir/
allSettled2 values:  []
done
processing file name:  fileBig1.txt
processing file name:  fileSmall1.txt
processing file name:  fileSmall2.txt
Done processing file fileSmall1.txt in 1143.886ms
Done processing file fileSmall2.txt in 4203.455ms
Done processing file fileBig1.txt in 66630.910ms

如您所见,“finally”和“then”的控制台日志在函数调用的解析之前打印。
你知道为什么以及如何解决吗?
感谢您的帮助!

额外:
这个程序应该获取目录路径并从所有文本文件(非常大的文件)返回单词计数器,如果您有一些改进建议那就太好了

我尝试了不同的 Promise 函数,例如 Promise.all 等。
我尝试将函数更改为箭头函数,但我能想到的都没有解决问题

javascript node.js streaming sf nodejs-stream
© www.soinside.com 2019 - 2024. All rights reserved.