我正在尝试撰写回复以启用报告下载。我通过数据库查询检索相关数据,并将其存储在内存中,以避免在服务器上生成不必要的文件。我当前的挑战是将 CSV 文件保存在 zip 文件中。遗憾的是,我在这个问题上花了几个小时,但没有找到令人满意的解决方案,而且我不确定我可能犯的具体错误。相关 CSV 文件大小约为 40 MB。
这是我的 FastAPI 代码。我已成功将 CSV 文件保存在本地,并且其中的所有数据都是准确的。我还成功地正确创建了包含 CSV 的 zip 文件。但是,FastAPI 响应的行为并不符合预期。下载后,它返回我的 zip 并出现错误
ZIP 文件已损坏,或者存档意外结束。
from fastapi import APIRouter, Depends
from sqlalchemy import text
from libs.auth_common import veryfi_admin
from libs.database import database
import csv
import io
import zipfile
from fastapi.responses import Response
router = APIRouter(
tags=['report'],
responses={404: {'description': 'not found'}}
)
@router.get('/raport', dependencies=[Depends(veryfi_admin)])
async def get_raport():
query = text(
"""
some query
"""
)
data_de = await database.fetch_all(query)
csv_buffer = io.StringIO()
csv_writer_de = csv.writer(csv_buffer, delimiter=';', lineterminator='\n')
csv_writer_de.writerow([
"id", "name", "date", "stock",
])
for row in data_de:
csv_writer_de.writerow([
row.id,
row.name,
row.date,
row.stock,
])
csv_buffer.seek(0)
zip_buffer = io.BytesIO()
with zipfile.ZipFile(zip_buffer, 'w', zipfile.ZIP_DEFLATED) as zip_file:
zip_file.writestr("data.csv", csv_buffer.getvalue())
response = Response(content=zip_buffer.getvalue())
response.headers["Content-Disposition"] = "attachment; filename=data.zip"
response.headers["Content-Type"] = "application/zip"
response.headers["Content-Length"] = str(len(zip_buffer.getvalue()))
print("CSV Buffer Contents:")
print(csv_buffer.getvalue())
return response
这也是vue3代码
const downloadReport = () => {
loading.value = true;
instance
.get(`/raport`)
.then((res) => {
const blob = new Blob([res.data], { type: "application/zip" });
const link = document.createElement("a");
link.href = window.URL.createObjectURL(blob);
link.download = "raport.zip";
link.click();
loading.value = false;
})
.catch(() => (loading.value = false));
};
<button @click="downloadReport" :disabled="loading">
Download Report
</button>
感谢您在我在此平台上解决第一个问题时的理解。
这是一个如何创建多个
csv
文件,然后将它们添加到 zip
文件,最后将 zip
文件返回给客户端的工作示例。此答案利用了之前在以下答案中讨论的代码和概念:this、this 和 this。因此,我建议查看这些答案以获取更多详细信息。
此外,由于
zipfile
模块的操作是同步,因此您应该使用普通 def
而不是 async def
来定义端点,除非您使用了其他也提供 async
API 的库,或者您需要对于某些协程来说,到 await
,在这种情况下,我建议在外部 zipfile
或 ThreadPool
中运行 ProcessPool
操作。请查看这个答案以获取相关解决方案,以及有关 async
/ await
的详细信息以及 FastAPI 如何处理 async def
和普通 def
API 端点。
此外,如果数据已经加载到内存中,则实际上不需要使用
StreamingResponse
,如下例所示。您应该返回自定义 Response
(请参阅下面的示例,以及 this、this 和 this 了解更多详细信息)。
请注意,出于演示目的,下面的示例循环遍历
list
的 dict
对象来写入 csv
数据,以便您更轻松地使其适应数据库查询数据情况。否则,也可以使用 csv.DictWriter()
及其 writerows()
方法(如这个答案中所示)来写入数据,而不是循环遍历 list
。
from fastapi import FastAPI, HTTPException, BackgroundTasks, Response
import zipfile
import csv
import io
app = FastAPI()
fake_data = [
{
"Id": "1",
"name": "Alice",
"age": "20",
"height": "62",
"weight": "120.6"
},
{
"Id": "2",
"name": "Freddie",
"age": "21",
"height": "74",
"weight": "190.6"
}
]
def create_csv(data: list):
try:
s = io.StringIO()
writer = csv.writer(s, delimiter='\t')
keys = data[0].keys()
writer.writerow(keys)
for row in data:
writer.writerow([row['Id'], row['name'], row['age'], row['height'], row['weight']])
s.seek(0)
return s.getvalue().encode('utf-16')
except:
raise HTTPException(detail='There was an error processing the data', status_code=400)
finally:
s.close()
@app.get('/')
def get_data(background_tasks: BackgroundTasks):
zip_buffer = io.BytesIO()
zip_file = zipfile.ZipFile(zip_buffer, 'w', zipfile.ZIP_DEFLATED)
try:
for i in range(5):
zip_info = zipfile.ZipInfo(f'data_{i}.csv')
csv_data = create_csv(fake_data)
zip_file.writestr(zip_info, csv_data)
except:
raise HTTPException(detail='There was an error processing the data', status_code=400)
finally:
zip_file.close()
zip_buffer.seek(0)
headers = {"Content-Disposition": "attachment; filename=files.zip"}
background_tasks.add_task(zip_buffer.close)
return Response(zip_buffer.getvalue(), headers=headers, media_type="application/x-zip-compressed")