使用分段上传到S3的流

问题描述 投票:0回答:1

我正在处理的当前项目要求多个进程将数据上传到S3中的单个文件。这些数据并行地来自多个源,因此为了尽快处理所有源,我们将使用多个Nodejs实例来侦听源。存在内存和存储限制,因此将所有提取的数据加载到内存或存储在磁盘中,然后执行单个上载是毫无疑问的。

为了遵守这些约束,我实现了流式上载:它缓冲来自单个源的一小部分数据,并将该缓冲区通过管道传输到上载流。当使用单个nodejs进程时,这确实很好用,但是,正如我提到的,目标是并行处理所有源。我的第一次尝试是打开多个流,以访问存储桶中的同一对象键。这只是使用最后一个流中的数据覆盖文件来关闭。所以我放弃了这个选项。

// code for the scenario above, where each process will open a separete stream to
// the same key and perform it's data ingestion and upload.
openStreamingUpload() {
  const stream = require('stream');
  const AWS = require('aws-sdk');
  const s3 = new this.AWS.S3(/* s3 config */);

  const passThrough = new stream.PassThrough();

  const params = {
    Key: 'final-s3-file.txt',
    Bucket: 'my-bucket',
    Body: passThrough
  };

  s3
    .upload(params)
    .promise();

  return passThrough;
}

async main() { // simulating a "never ending" flow of data
  const stream = openStreamingUpload();
  let data = await receiveData();;

  do {
    stream.write(data);
    data = await receiveData();
  } while(data);
  stram.close();
}
main();

接下来,我去尝试S3 API提供的multipart upload。首先,我创建一个分段上传,获取其ID并将其存储在共享空间中。之后,我尝试在集群将使用的所有nodejs进程上打开多个分段上传,并使用事先获得的相同UploadId。这些分段上传中的每一个都应具有一个流,该流将通过管道传输接收到的数据。我遇到的问题是分段上传需要事先知道部分长度,并且当我通过管道传输不知道何时关闭或将传输的数据量的流时,无法计算其大小。受this implementation启发的代码。

  const AWS = require('aws-sdk');
  const s3 = new this.AWS.S3(/* s3 config */);
  async startMultipartUpload()
    const multiPartParams = {
      Key: 'final-s3-file.txt',
      Bucket: 'my-bucket'
    };
    const multipart = await s3.createMultipartUpload(multiPartParams).promise();

    return multipart.UploadId;
  }

  async finishMultipartUpload(multipartUploadId) {
    const finishingParams = {
      Key: 'final-s3-file.txt',
      Bucket: 'my-bucket',
      UploadId: multipartUploadId
    };
    const data = await s3.completeMultipartUpload(finishingParams).promise();

    return data;
  }

  async openMultiparStream(multipartUploadId) {
    const stream = require('stream');
    const passThrough = new stream.PassThrough();

    const params = {
      Body: passThrough.,
      Key: 'final-s3-file.txt',
      Bucket: 'my-bucket',
      UploadId: multipartUploadId,
      PartNumber: // how do I know this part number when it's, in principle, unbounded?
    };

    s3
      .uploadPart(params)
      .promise();

    return passThrough 
  }

 // a single process will start the multipart upload
 const uploadId startMultipartUpload();
 async main() { // simulating a "never ending" flow of data
   const stream = openMultiparStream(uploadId);
   let data = await receiveData();;

   do {
     stream.write(data);
     data = await receiveData();
   } while(data);
   stram.close();
 }

 main(); // all the processes will receive and upload to the same UploadId
 finishMultipartUpload(uploadId); // only the last process to closm will finish the multipart upload.

[搜索时,我从AWS遇到the article,提出了upload() API方法,并说它抽象了多部分API,以允许使用管道数据流上传大文件。因此,我想知道是否有可能从流式传输的“简单”上传中获取uploadId,因此我可以将此ID传递给集群,然后上传至同一对象,并且仍保持流式传输特性。是否有人尝试过这种“分段上传”上传的方案?

node.js amazon-web-services amazon-s3 streaming
1个回答
0
投票

您是否找到“分段上传”解决方案?thks

© www.soinside.com 2019 - 2024. All rights reserved.