我正在尝试找到一种方法来即时提取S3中的.gz文件,无需将其下载到本地,提取然后将其推回S3。
使用boto3 + lambda,我怎样才能实现我的目标?
我在boto3文档中没有看到任何摘录部分。
您可以使用 BytesIO 从 S3 流式传输文件,通过 gzip 运行它,然后使用
upload_fileobj
将其通过管道传输回 S3 以写入 BytesIO。
# python imports
import boto3
from io import BytesIO
import gzip
# setup constants
bucket = '<bucket_name>'
gzipped_key = '<key_name.gz>'
uncompressed_key = '<key_name>'
# initialize s3 client, this is dependent upon your aws config being done
s3 = boto3.client('s3', use_ssl=False) # optional
s3.upload_fileobj( # upload a new obj to s3
Fileobj=gzip.GzipFile( # read in the output of gzip -d
None, # just return output as BytesIO
'rb', # read binary
fileobj=BytesIO(s3.get_object(Bucket=bucket, Key=gzipped_key)['Body'].read())),
Bucket=bucket, # target bucket, writing to
Key=uncompressed_key) # target key, writing to
确保您的密钥读取正确:
# read the body of the s3 key object into a string to ensure download
s = s3.get_object(Bucket=bucket, Key=gzip_key)['Body'].read()
print(len(s)) # check to ensure some data was returned
以上答案是针对
gzip
文件的,对于zip
文件,你可以尝试
import boto3
import zipfile
from io import BytesIO
bucket = 'bucket1'
s3 = boto3.client('s3', use_ssl=False)
Key_unzip = 'result_files/'
prefix = "folder_name/"
zipped_keys = s3.list_objects_v2(Bucket=bucket, Prefix=prefix, Delimiter = "/")
file_list = []
for key in zipped_keys['Contents']:
file_list.append(key['Key'])
#This will give you list of files in the folder you mentioned as prefix
s3_resource = boto3.resource('s3')
#Now create zip object one by one, this below is for 1st file in file_list
zip_obj = s3_resource.Object(bucket_name=bucket, key=file_list[0])
print (zip_obj)
buffer = BytesIO(zip_obj.get()["Body"].read())
z = zipfile.ZipFile(buffer)
for filename in z.namelist():
file_info = z.getinfo(filename)
s3_resource.meta.client.upload_fileobj(
z.open(filename),
Bucket=bucket,
Key='result_files/' + f'{filename}')
这适用于您的
zip
文件,并且您的解压缩结果数据将位于 result_files
文件夹中。确保将 AWS Lambda
上的内存和时间增加到最大,因为某些文件非常大并且需要时间来写入。
Amazon S3 是一种存储服务。没有操纵文件内容的内置功能。
但是,您可以使用 AWS Lambda 函数从 S3 检索对象,解压缩它,然后再次上传内容。但请注意,Lambda 的临时磁盘空间默认限制为 500MB,因此请避免同时解压太多数据。
您可以将 S3 存储桶配置为在存储桶中创建新文件时触发 Lambda 函数。 Lambda 函数将:
gzip
Python 库提取文件示例代码:
import gzip
import io
import boto3
bucket = '<bucket_name>'
key = '<key_name>'
s3 = boto3.client('s3', use_ssl=False)
compressed_file = io.BytesIO(
s3.get_object(Bucket=bucket, Key=key)['Body'].read())
uncompressed_file = gzip.GzipFile(None, 'rb', fileobj=compressed_file)
s3.upload_fileobj(Fileobj=uncompressed_file, Bucket=bucket, Key=key[:-3])
使用 BytesIO 打开大型 zip 文件导致内存错误。
我正在打开 6GB 以上的 zip 文件,其中充满了要放置在另一个文件夹中的文件。使用 buffer = BytesIO(zip_obj.get()["Body"].read()),我遇到了内存不足错误,因为它将主体缓冲到内存中。我限制了 lambda 的内存,因为我可能会接收更大的文件,并希望确保它是流式传输,并且 lambda 最大内存 10120 不会成为问题。我使用 python smart_open 作为层来传输 zip 文件以对其进行处理。
lambda 使用不到 150mb 和 @500sec 来处理 6GB 以上的 zip 文件。因此 lambda max 900 秒成为您的限制因素,并且需要对每个提取的文件进行异步处理来克服。
layers_dir = '/opt'
sys.path.insert(0, layers_dir)
from smart_open import open
class Unzip():
...
def process_zip(self):
with zipfile.ZipFile(open(f's3://{self.bucket}/{self.key}', 'rb', transport_params={'client':self.s3.client})) as zip_ref:
for file_info in zip_ref.infolist():
if not file_info.is_dir():
file_name = file_info.filename
key = os.path.join(self.outputPath, file_name)
print(f'Extracting {key}')
with zip_ref.open(file_name, 'r') as data:
self.s3.client.upload_fileobj(
Fileobj=data,
Bucket=self.bucket,
Key=key
)
另一个可以尝试的选择是 s3Wrapper
对于非常大的文件,您应该使用 EC2 实例,并使用 httpx 和 Stream-unzip 使用其 s3 url 读取 zip 文件。下面的代码可以为您完成这项工作:
import boto3
from stream_unzip import stream_unzip
import httpx
from io import BytesIO
s3_client = boto3.client('s3')
bucket_name = 'bucket_name'
def zipped_chunks():
with httpx.stream('GET', 'https://bucket.s3.amazonaws.com/path/file.zip') as r:
yield from r.iter_bytes(chunk_size=65536)
for file_name, file_size, unzipped_chunks in stream_unzip(zipped_chunks()):
s3_key = f'unzipped/{file_name}'.replace("'","")
buffer = BytesIO()
for chunk in unzipped_chunks:
buffer.write(chunk)
buffer.seek(0)
s3_client.upload_fileobj(buffer, bucket_name, s3_key)
buffer.close()
print(f"File '{file_name}' has been uploaded to S3 bucket '{bucket_name}' with key '{s3_key}'")