此时我有相当大的代码块,它们正在创建一个用于文件传输的 API。然而,该代码在转换数据并将其一一写入磁盘时依赖于临时目录以节省内存。我可以访问此
temp_dir
并流式传输(压缩的)数据,但这依赖于将整个 zip 文件读入内存,从而破坏了流式传输的意义..
我有一个端点(剥回到它的骨架):
# data endpoint
@router.get("/fetch_data/{cadence}/{code}")
async def get_data(
*args
):
"""
try:
data = fetch_data(*args)
return data
except ValueError as e:
return {"error": str(e)}
fetch_data()
实际上最终调用一个包含代码的函数,该函数只是获取存储中的文件列表(file_p),将它们转换为新格式,然后将它们保存到temp_dir
。 self.data 是返回并馈送到 API 端点的内容。然后调用函数 zip_data
(下文进一步)..
with tempfile.TemporaryDirectory() as temp_dir:
filepaths = []
for F in file_p: # iterate over files
# get data as df and format
df = fr.read_iaga(file_path=F)[0]
df = self.process_text_df(cadence, input, df)
self.convert_to_file(self.format, cadence=cadence,
df=df, zip_json=zip_json)
file_path = os.path.join(temp_dir, self.filename)
with open(file_path, open_mode) as file:
file.write(self.data)
filepaths.append(file_path)
self.data = zip_data(self.obs_code, filepaths)
zip_data()
...这会压缩准备交付的数据并返回StreamingResponse
,但是,依赖于io.BytesIO()
,因此所有内容都存储在内存中,使得流式传输毫无意义。
def zip_data(obs_code, file_p):
"""
Takes files and compresses and zips for output.
Parametersformat
-------------
obs_code: STR
3-letter observatory code.
file_p: LIST
List of filepath strings to zip.
Returns
-------------
data: FastAPI Response object.
API can return this reponse and endpoint
will download file.
"""
zip_filename = f"WDC_{obs_code.upper()}.zip"
s = io.BytesIO()
zf = zipfile.ZipFile(s, "w", compression=zipfile.ZIP_DEFLATED)
for fpath in file_p:
_, fname = os.path.split(fpath)
with open(fpath, 'rb') as file:
zf.writestr(fname, file.read(), compress_type=zipfile.ZIP_DEFLATED)
zf.close()
s.seek(0)
return StreamingResponse(
iter([s.getvalue()]),
media_type="application/x-zip-compressed",
headers={'Content-Disposition': f'attachment;filename={zip_filename}'}
)
这是一段包含多个脚本的庞大代码,所以我试图提供所需的一切,但又不会让人不知所措,但本质上重构代码会非常烦人。我可以以某种方式获取这些临时文件吗?
我想我已经找到了解决方案,但也许有人可以确认整个zip是否仍然被读取到内存中?我厌倦了 open() 的使用?我利用
shutil
并将数据移动到 zip 存档中,如下所示:
import shutil, tempfile
from fastapi.responses import StreamingResponse
import os
def make_archive(source, destination):
"""
Use shutil to archive data into zip.
"""
base = os.path.basename(destination)
name = base.split('.')[0]
format = base.split('.')[1]
# source
archive_from = os.path.abspath(source)
shutil.make_archive(name, format, archive_from)
shutil.move(f"{name}.{format}", destination)
然后简单地在我的临时目录块中调用
zip_data()
:
def zip_data(obs_code, temp_dir):
"""
Takes files and compresses and zips for output.
Parametersformat
-------------
obs_code: STR
3-letter observatory code.
temp_dir: STR
Path to temporary directory.
Returns
-------------
FastAPI Response object.
API can return this reponse and endpoint
will download file.
"""
# Create a zip archive from the temporary directory
zip_filename = os.path.join(temp_dir, f"{obs_code.upper()}.zip")
make_archive(temp_dir, zip_filename)
# Stream the zip file as a response
response = StreamingResponse(
open(zip_filename, 'rb'),
media_type="application/zip",
headers={"Content-Disposition":
f"attachment; filename={obs_code.upper()}.zip"}
)
return response
因此临时文件会被压缩并流式传输,如果出现任何问题,临时文件将确保被删除。有人认为需要清理吗?似乎可行,但是有人可以说明整个 zip 是否已准备好记忆,因为我们不希望这样!