我正在寻找针对非常大的文件的 S3 分段上传的示例实现。网上找了各种半途而废的实现,但没有一个能满足我的要求。
我能想到的最简单、最高效的实现。我希望我能以某种方式使用 Promise.all() 来加速上传过程,但我发现使用高度异步实现来维护零件顺序很困难。我用 6GB 文件和目录对此进行了测试。
const _ = require('underscore')
const mime = require('mime-types')
let AWS = require('aws-sdk/index');
AWS.config.region = 'us-west-2';
let s3 = new AWS.S3();
async function multipartUploadFile(params, filePath) {
console.log(`S3Helper: Beginning multipart upload of file ${params.Key} to ${params.Bucket}`)
//First create the multipart upload ID
let multipartCreateResult = await s3.createMultipartUpload({
Bucket: params.Bucket,
Key: params.Key,
ContentType: mime.lookup(filePath),
StorageClass: 'STANDARD'
}).promise();
console.log("S3Helper: multipartUploadFile createResult - ", multipartCreateResult)
let chunkCount = 1;
let CHUNK_SIZE = 10 * 1024 * 1024;
let uploadedParts = []
//Read through the file, grab a chunk, and put it in an upload promise
async function gatherChunks() {
const stream = fs.createReadStream(filePath, { highWaterMark: CHUNK_SIZE });
for await(const data of stream) {
// do something with data
let etag = await s3.uploadPart({
Body: data,
Bucket: params.Bucket,
Key: params.Key,
PartNumber: chunkCount,
UploadId: multipartCreateResult.UploadId,
}).promise().then( (result) => {return result.ETag.toString()})
uploadedParts.push({
ETag: etag,
PartNumber: chunkCount
})
chunkCount++;
}
}
gatherChunks().then( () => {
let sortedUploads = _.sortBy(uploadedParts, 'PartNumber')
console.log("Sorted uploadedParts: ", sortedUploads)
return s3.completeMultipartUpload({
Bucket: params.Bucket,
Key: params.Key,
MultipartUpload: {
Parts: sortedUploads
},
UploadId: multipartCreateResult.UploadId
}).promise()
})
}
let params = {
Bucket: "someBucket",
Key: "someKey"
};
let filePath = "./someFilePath"
await multipartUploadFile(params, filePath);
这是我对这个问题的贡献。
谢谢 jim-chertkov,根据您上面的回答,我能够完成此任务。
Promise.all 运行正常。我只需在发送到completeMultipartUpload 之前对结果进行排序
async multipartUploadFile({file, key}) {
const { awsKey, secret, bucket } = this.retrieveConfigs()
aws.config.update({
accessKeyId: awsKey,
secretAccessKey: secret,
region: 'sa-east-1',
})
const s3 = new aws.S3();
const uploadParams = {
Bucket: bucket,
Key: key,
};
const upload = await s3.createMultipartUpload(uploadParams).promise();
const fileBuffer = Buffer.from(file.toString('base64'));
const partSize = 5 * 1024 * 1024;
const parts = [];
let offset = 0;
while (offset < fileBuffer.length) {
const part = Buffer.from(fileBuffer.slice(offset, offset + partSize));
parts.push(part);
offset += partSize;
}
const uploadedParts = [];
const promises = parts.map(async (partData, index) => {
const uploadPartParams = {
Bucket: bucket,
Key: uploadParams.Key,
UploadId: upload.UploadId,
PartNumber: index + 1,
Body: partData,
};
const part = await s3.uploadPart(uploadPartParams).promise()
console.log('part', part)
const { ETag } = part;
uploadedParts.push({ PartNumber: index + 1, ETag: ETag });
});
await Promise.all(promises);
const orderned = uploadedParts.sort(this.orderParts)
console.log('orderned', orderned)
const completeParams = {
Bucket: bucket,
Key: uploadParams.Key,
MultipartUpload: { Parts: orderned },
UploadId: upload.UploadId,
};
console.log('completeParams', JSON.stringify(completeParams, null, 2))
const complete = await s3.completeMultipartUpload(completeParams).promise()
console.log('complete', complete)
return complete
}
日志只是为了显示发生了什么。