我正在尝试从 S3 存储桶中的子文件夹中的多个文件创建一个 zip 文件,然后将该 zip 文件保存在同一存储桶中的另一个子文件夹中。
在本地运行 Flask 应用程序时,我可以从 S3 子文件夹存储桶创建 zip 文件,但不能使用 Heroku,因为它不存储任何内容。
我正在查看这个示例,但它似乎过时并且使用本地文件。 https://www.botreetechnologies.com/blog/create-and-download-zip-file-in-django-via-amazon-s3
这是我正在使用的代码片段。
from flask import Response
import boto3, zipfile, os
AWS_ACCESS_KEY_ID = "some access key"
AWS_ACCESS_SECRET_ACCESS_KEY = "some secret key"
AWS_STORAGE_BUCKET_NAME = "some bucket"
aws_session = boto3.Session(aws_access_key_id = AWS_ACCESS_KEY_ID,
aws_secret_access_key = AWS_SECRET_ACCESS_KEY)
s3 = aws_session.resource("s3")
s3 = boto3.client("s3", region_name = "some region")
s3_resource = boto3.resource("s3")
blog_folder = "blog_1"
paginator = s3.get_paginator("list_objects")
file_list = [page for page in paginator.paginate(Bucket=AWS_STORAGE_BUCKET_NAME)\
.search("Contents[?Size >`0`][]")
if blog_folder in page["Key"]]
zf = zipfile.ZipFile(byte, "w")
zipped_files = []
zip_filename = "download_files.zip"
for key in file_list:
file_name = key["Key"].split("/")[-1]
my_bucket = s3_resource.Bucket(AWS_STORAGE_BUCKET_NAME)
file_obj = my_bucket.Object(key["Key"]).get()
zipped_files.append(file_obj["Body"].read())
知道如何解决这个问题吗?对于用户来说,下载 zip 文件比下载单个文件要方便得多。 非常感谢任何帮助。
python 的内存中 zip 库非常适合此目的。这是我的一个项目的示例:
import io
import zipfile
zip_buffer = io.BytesIO()
with zipfile.ZipFile(zip_buffer, "a", zipfile.ZIP_DEFLATED, False) as zipper:
infile_object = s3.get_object(Bucket=bucket, Key=object_key)
infile_content = infile_object['Body'].read()
zipper.writestr(file_name, infile_content)
s3.put_object(Bucket=bucket, Key=PREFIX + zip_name, Body=zip_buffer.getvalue())
所以我设法让它在我的 Heroku Flask 应用程序中工作。 希望它可以帮助任何正在挣扎的人。 PS 子文件夹 = blog_folder 所以结构是,Bucket/blog_folder/resources 存储桶/blog_folder/zipped
import tempfile, zipfile, os, boto3
AWS_ACCESS_KEY_ID = "some access key"
AWS_ACCESS_SECRET_ACCESS_KEY = "some secret key"
AWS_STORAGE_BUCKET_NAME = "some bucket"
def make_zipfile(output_filename, source_dir):
relroot = os.path.abspath(os.path.join(source_dir, os.pardir))
with zipfile.ZipFile(output_filename, "w", zipfile.ZIP_DEFLATED) as zip:
for root, dirs, files in os.walk(source_dir):
# add directory (needed for empty dirs)
zip.write(root, os.path.relpath(root, relroot))
for file in files:
filename = os.path.join(root, file)
if os.path.isfile(filename): # regular files only
arcname = os.path.join(os.path.relpath(root, relroot), file)
zip.write(filename, arcname)
aws_session = boto3.Session(aws_access_key_id = AWS_ACCESS_KEY_ID,
aws_secret_access_key = AWS_SECRET_ACCESS_KEY)
s3 = aws_session.resource("s3")
current_path = os.getcwd()
temp = tempfile.TemporaryDirectory(suffix="_tmp", prefix="basic_", dir=current_path)
### AT TOP OF YOUR APP.PY file ^^^^^^^^^^
@app_blog.route("/download_blog_res_zipfile/<int:blog_id>", methods = ["GET", "POST"])
def download_blog_res_zipfile(blog_id):
current_path = os.getcwd()
blog = Blog.query.filter_by(id = blog_id).first()
print(blog)
print("DOWNLOAD COUNT!!!")
print(blog.download_count)
blog.download_count += 1
db.session.commit()
del_folders = os.listdir(os.getcwd() + "/BLOG_ZIPPED_FOLDER")
for folder in del_folders:
zipp_path = os.getcwd() + "/BLOG_ZIPPED_FOLDER/" + folder
print(folder)
print("DELETING ZIPPING!")
shutil.rmtree(os.getcwd() + "/BLOG_ZIPPED_FOLDER/" + folder)
temp_zipp = tempfile.TemporaryDirectory(suffix="_tmp", prefix="zipping_",
dir=current_path + "/BLOG_ZIPPED_FOLDER")
s3 = boto3.client("s3", region_name = REGION_NAME)
s3_resource = boto3.resource("s3")
my_bucket = s3_resource.Bucket(AWS_STORAGE_BUCKET_NAME)
paginator = s3.get_paginator("list_objects")
folder = "blogs/blog_{}/resources".format(blog.id)
file_list = [page for page in paginator.paginate(Bucket = AWS_STORAGE_BUCKET_NAME)\
.search("Contents[?Size >`0`][]")
if folder in page["Key"]]
for key in file_list:
file_name = key["Key"].split("/")[-1]
print(file_name)
file_obj = my_bucket.Object(key["Key"]).get()["Body"]
with open(os.getcwd() + "/" + BLOG_FOLDER + "/" + file_name, "wb") as w:
w.write(file_obj.read())
make_zipfile(temp_zipp.name + "/blog_res_{}.zip".format(blog_id),
current_path + "/" + BLOG_FOLDER)
try:
for key in file_list:
file_name = key["Key"].split("/")[-1]
file_path = current_path + "/" + BLOG_FOLDER +"/" + file_name
os.remove(file_path)
print("TRYY!!")
print("REMOVED!!!")
except:
for key in file_list:
file_name = key["Key"].split("/")[-1]
file_path = current_path + "/" + BLOG_FOLDER + "/" + file_name
os.remove(file_path)
print("EXCEPT!!!")
print("REMOVED!!!")
return send_from_directory(temp_zipp.name, "blog_res_{}.zip".format(blog_id),
as_attachment = True)
我使用 https://zipkit.io 解决了这个问题,取得了巨大成功,我的应用程序也托管在 Heroku 上。