[当前,我正在使用DRF版本'3.9.2'。我正在同步从模型导出一些CSV文件。代码块的示例说明如下:
urls.py
from django.urls import path
urlpatterns = [
path('api/users-csv-export/', UsersExportAsCSV.as_view())
]
views.py
from rest_framework.views import APIView
def get_users_data():
queryset = User.objects.only('first_name', 'last_name', 'created_at', 'email', 'gender', 'date_of_birth')
fields = ['first_name', 'last_name', 'created_at', 'email', 'gender', 'date_of_birth']
titles = ['First Name', 'Last Name', 'Date Added', 'Email', 'Gender', 'Date of Birth']
file_name = 'users'
return queryset, fields, titles, file_name
class UsersExportAsCSV(APIView):
def get(self, request):
users = get_users_data()
data = export_to_csv(queryset=users[0], fields=users[1], titles=users[2], file_name=users[3])
return data
utils.py
def export_to_csv(queryset, fields, titles, file_name):
"""
will export the model data in the form of csv file
:param queryset: queryset that need to be exported as csv
:param fields: fields of a model that will be included in csv
:param titles: title for each cell of the csv record
:param file_name: the exported csv file name
:return:
"""
model = queryset.model
response = HttpResponse(content_type='text/csv')
# force download
response['Content-Disposition'] = 'attachment; filename={}.csv'.format(file_name)
# the csv writer
writer = csv.writer(response)
if fields:
headers = fields
if titles:
titles = titles
else:
titles = headers
else:
headers = []
for field in model._meta.fields:
headers.append(field.name)
titles = headers
# Writes the title for the file
writer.writerow(titles)
# write data rows
for item in queryset:
writer.writerow([nested_getattr(item, field) for field in headers])
return response
随着数据的增长,请求-响应周期变得越来越重。为了防止此请求阻止其他请求,请选择[[线程处理与异步任务。关于优化请求-响应周期有什么好主意。
这里用户必须等到整个导出工作完成并下载完成。这里的理想结果是,每当用户访问URL时,立即响应“正在生成文件”并在后台生成并下载文件。将非常感谢您提供有关此主题的任何帮助。
https://pypi.org/project/django-celery/
from celery import task
def get_users_data():
queryset = list(User.objects.values_list('first_name', 'last_name', 'created_at', 'email', 'gender', 'date_of_birth'))
fields = ['first_name', 'last_name', 'created_at', 'email', 'gender', 'date_of_birth']
titles = ['First Name', 'Last Name', 'Date Added', 'Email', 'Gender', 'Date of Birth']
file_name = 'users'
return queryset, fields, titles, file_name
@task
def export_to_csv(queryset, fields, titles, file_name):
"""
will export the model data in the form of csv file
:param queryset: queryset that need to be exported as csv
:param fields: fields of a model that will be included in csv
:param titles: title for each cell of the csv record
:param file_name: the exported csv file name
:return:
"""
model = queryset.model
response = HttpResponse(content_type='text/csv')
# force download
response['Content-Disposition'] = 'attachment; filename={}.csv'.format(file_name)
# the csv writer
writer = csv.writer(response)
if fields:
headers = fields
if titles:
titles = titles
else:
titles = headers
else:
headers = []
for field in model._meta.fields:
headers.append(field.name)
titles = headers
# Writes the title for the file
writer.writerow(titles)
# write data rows
# here you can save the file at particular path
for item in queryset:
writer.writerow([nested_getattr(item, field) for field in headers])
return file_path
class UsersExportAsCSV(APIView):
def get(self, request):
users = get_users_data()
task_id = export_to_csv.delay(queryset=users[0], fields=users[1], titles=users[2], file_name=users[3])
return task_id
using the task id you can get the result of that
另一种方法是使用djnago-channel,它使用您无需为检查任务完成或不进行池请求而进行的套接字连接
首先,
GET方法将检查导出的文件记录在缓存中是否可用。如果可用,则将URL发送到该文件作为响应。 POST方法将生成文件并将其写入media / temp,并将记录存储在缓存中。
在代码块中进行的更改很少,如下所述:views.py
from rest_framework.views import APIView
def get_users_data():
queryset = User.objects.only('first_name', 'last_name', 'created_at', 'email', 'gender', 'date_of_birth')
fields = ['first_name', 'last_name', 'created_at', 'email', 'gender', 'date_of_birth']
titles = ['First Name', 'Last Name', 'Date Added', 'Email', 'Gender', 'Date of Birth']
return queryset, fields, titles, file_name
class TripHistoryExportAsCSV(APIView):
file_name = "users_all"
file_extension = 'csv'
def post(self, request):
try:
queryset = get_users_data()[0]
fields = get_users_data()[1]
titles = get_users_data()[2]
x = threading.Thread(target=export_to_csv, args=(queryset, fields, titles, self.file_name))
x.start()
return Response({
'message': 'CSV file is generating'
})
except EmptyResultSet:
return Response({
'message': 'Can not create CSV file'
}, status=status.HTTP_200_OK)
def get(self, request):
data = check_export_data_in_cache(self.file_name, self.file_extension)
if data:
return Response({
'url': data.get('report_url')
})
else:
return Response({
'message': 'Generation of new files required'
}, status=status.HTTP_204_NO_CONTENT)
utils.py
def nested_getattr(obj, attribute, split_rule='__'):
"""
This function is responsible for getting the nested record from the given obj parameter
:param obj: whole item without splitting
:param attribute: field after splitting
:param split_rule:
:return:
"""
split_attr = attribute.split(split_rule)
for attr in split_attr:
if not obj:
break
obj = getattr(obj, attr)
return obj
def export_to_csv(queryset, fields, titles, file_name):
"""
will export the model data in the form of csv file
:param queryset: queryset that need to be exported as csv
:param fields: fields of a model that will be included in csv
:param titles: title for each cell of the csv record
:param file_name: the exported csv file name
:return:
"""
model = queryset.model
import os
from yatruadminbackend.settings import MEDIA_ROOT
if fields:
headers = fields
if titles:
titles = titles
else:
titles = headers
else:
headers = []
for field in model._meta.fields:
headers.append(field.name)
titles = headers
with open(os.path.join(MEDIA_ROOT, f'temp/{file_name}.csv'), 'w', newline='') as file:
# Writes the title for the file
writer = csv.writer(file)
writer.writerow(titles)
# write data rows
for item in queryset:
writer.writerow([nested_getattr(item, field) for field in headers])
set_cache_for_export_file(file_name, 'csv')
def set_cache_for_export_file(filename, extension):
generated_date = timezone.now()
export_file_name = f'{filename}_{extension}'
record_in_cache = {
'key': export_file_name,
'report_url': f'{BACKEND_URL}media/temp/{filename}.csv',
'generated_on': generated_date
}
cache.set(export_file_name, record_in_cache, 300)
print(cache.get(export_file_name))
def check_export_data_in_cache(file_name, file_extension):
cache_key = f'{file_name}_{file_extension}'
print(cache_key)
if cache.get(cache_key):
return cache.get(cache_key)