Nodejs |解压缩并仅读取文件的前n行

问题描述 投票:0回答:2

我在s3中有一个zip,其中包含数百个csv文件。我正在尝试流式传输文件,并且需要读取文件的前n行。我能够解压缩并阅读内容,但是当我读完n行并继续处理其余文件时,不确定如何停止流。

到目前为止尝试过的代码

const aws = require("aws-sdk");
const s3 = new aws.S3();
const etl = require("etl");
const unzip = require("unzip-stream");

function setupMetadata() {
  s3.getObject({Bucket: 'test', Key: 'CSV.zip'}).createReadStream()
    .pipe(unzip.Parse())
    .on('entry', function (entry) {
      var i = 0;
      var recordIdentifier;
      entry
      .pipe(etl.map(res => {
        if (recordIdentifier) {
          console.log(recordIdentifier);
          console.log(i++);
          // not sure about this. THis works but it only works for 1st file
          // after that the program terminates. I need to do that for all the
          // files in the zip
          entry.destroy(); 
        }
        const data = res.toString("utf-8");
        var array = data.toString().split("\n");
        if(array.length >= 3) {
          recordIdentifier = array[2].split(",")[0];
        }
      }))
    })
}

setupMetadata();

我已经尝试在阅读内容后调用entry.autodrain(),但是它不起作用。 entry.destroy()有效,但是程序在此之后终止。我想对zip中的所有文件执行相同的操作。

任何帮助将不胜感激。

提前感谢。

node.js asynchronous unzip nodejs-stream
2个回答
1
投票

我已经尝试复制类似的情况。我希望你需要这样的东西:

const etl = require("etl");
const unzip = require("unzip-stream");
const fs = require('fs');

function readRecord(entry, entNum) {

    let recordCount = 0;
    let etlcsv = entry.pipe(etl.csv())
    etlcsv.pipe(etl.map(d => {
        console.log(d);
        recordCount++;
        if (recordCount > 2) {
            etlcsv.destroy()
            entry.autodrain();
        }
    }))
}

function setupMetadata() {
    let entryCount = 0;
    let ent = {}

    let test = fs.createReadStream('csv.zip').pipe(unzip.Parse())
    test.on('entry', function(entry) {
        entryCount++;
        console.log(entryCount)
        readRecord(entry, entryCount)
    })

}

setupMetadata()

检查此REPL以进行测试:https://repl.it/@sandeepp2016/PlushFatherlyMonotone


0
投票

我使用got完成了与此类似的事情。

我不得不修改代码,因此未经测试,但是这种通用方法对我有用。我没有使用过etl,所以我不确定它如何处理大块文件,而不是整个文件。

const aws = require("aws-sdk");
const s3 = new aws.S3();
const got = require('got');
const etl = require("etl");


const getAllFilesAsUrls = async(bucket_name, folder_name) => {

    const listParams = {
        Bucket: bucket_name,
        Delimiter: '/',
        StartAfter: `${folder_name}/`
    };

    const data = await s3.listObjectsV2(listParams).promise();

    const keys = data.Contents.map(object => object.key);

    const urlsArray = [];

    for (let key of keys) {

        const params = {Bucket: bucket_name, Key: key};
        let url = await s3.getSignedUrl('getObject', params).promise();
        urlsArray.push(url);

    }

    return urlsArray;


}

workEachFileUrl = (url) => {

    return new Promise((resolve, reject) => {

        //if you're looking to limit the amount of data transferred then this was a good way of doing it
        const gotstream = got.stream(url, { headers: { "accept-encoding": "gzip" } })
            .on('data', (chunk) => {
                //pause the stream as soon as we get the first chunk
                gotstream.pause();
                //do your work with the chunk, as long as etl can handle partial files, then resolve with the first few lines
                //otherwise just use the 'response' event as you were with                     const parsedChunk;
                resolve(parsedChunk);
            })
            .on("error", (err) => {    
                console.log(err);
                reject(err);
            });
    });

}

runOperation = async (bucket_name, folder_name) => {

    const records = [];

    const urls = await getAllFilesAsUrls(bucket_name, folder_name);

    for (let url of urls) {

        let record = await workEachFileUrl(url);
        records.push(record);

    }

    return records;

}

const completedRecords = await runOperation(bucket_name, folder_name);
© www.soinside.com 2019 - 2024. All rights reserved.