节点上的路由采用zip文件,然后从中将其解压缩到内存中并将其上传到S3存储桶。一切都工作正常,但我正在努力解决zip文件中的所有文件完成后。
function unzipAndUploadToS3(fileInfo) {
return new Promise((resolve, reject) => {
fs.createReadStream(fileInfo.zip.path)
.pipe(unzipper.Parse())
.pipe(etl.map(entry => {
if (checkEntry(entry.path)) {
fileInfo.name = entry.path;
entry
.buffer()
.then(content => {
fileInfo.data = content;
AWS.uploadToS3(fileInfo).then(result => {
console.log(result.Location);
resolve(result.Location); //ALWAYS Resolves here
}).catch(err => {
console.error(err);
reject(err);
})
})
}
else
entry.autodrain();
}))
});
}
我尝试过Promise.all和Async / Await,但似乎可以解决它。
我之前从未使用过etl
,但他们的documentation有评论说
从流转换到承诺链......
然后给出代码.promise().then(...)
。你说你尝试过Promise.all
,但你没有说怎么做,所以我不知道你是否已经尝试过这个。但这是我认为可能会发生的方向:
function unzipAndUploadToS3(fileInfo) {
return new Promise((resolve, reject) => {
fs.createReadStream(fileInfo.zip.path)
.pipe(unzipper.Parse())
.pipe(etl.map(entry => {
if (checkEntry(entry.path)) {
fileInfo.name = entry.path;
return entry.buffer() //<- return promise
.then(content => {
fileInfo.data = content;
return AWS.uploadToS3(fileInfo) //<- return promise
})
}
else
entry.autodrain();
}))
.promise().then(awsPromises => Promise.all(awsPromises)) //<- return promise
.then(x => resolve('x should be an array of s3 results'))
.catch(err => reject(err));
});
}
为了实现这一点,尽管所有的promise都需要正确链接在一起以进行条目缓冲,然后上传到s3。我标记了重要的回报,以保持与评论的链接。
还有一些对我来说有点不对劲:我无法相信你能够继续重复使用同样的fileInfo
这样的多个文件。对我来说看起来像竞争条件:在下一个文件覆盖fileInfo.data
和fileInfo.name
之前,上一次上传是否完成?我的猜测是你最好在地图中创建一个新的fileInfo对象,而不是对zip中的所有文件重复使用相同的文件。
这是最终为我工作的东西。这使用unzipper,etl和lodash。
function unzipAndUploadToS3(fileInfo) {
return new Promise((resolve, reject) => {
var filesToProcess = [];
var filesStream = fs.createReadStream(fileInfo.zip.path)
.pipe(unzipper.Parse())
.pipe(etl.map(entry => {
if (checkEntry(entry.path)) {
fileInfo.name = entry.path.substr(entry.path.indexOf('/') + 1, entry.path.length);
entry
.buffer()
.then(content => {
fileInfo.data = content;
var newObj = _.clone(fileInfo); //need to clone object because of object reference in assignment
var promise = new Promise(function(resolve, reject) { //create new promise to upload to S3
AWS.uploadToS3(newObj).then(result => { //a function in another module uses aws-sdk
resolve(result)
}).catch(err => reject(err));
})
filesToProcess.push(promise); //push this promise into an array of promises
})
}
else
entry.autodrain();
}))
filesStream.on('readable', function () { console.log('readable'); })
.on('data', function (data) { console.log('data', data); })
.on('error', function (err) {
console.error('Error', err);
reject(err);
})
.on('end', function () {
Promise.all(filesToProcess).then(values => { //process all the promises
console.log("values>", values);
resolve(values);
}).catch(err => {reject(err)});
})
});
}