Django Rest Framework从后台模型导出csv数据

问题描述 投票:2回答:3

[当前,我正在使用DRF版本'3.9.2'。我正在同步从模型导出一些CSV文件。代码块的示例说明如下:

urls.py

from django.urls import path

urlpatterns = [
    path('api/users-csv-export/', UsersExportAsCSV.as_view())
]

views.py

from rest_framework.views import APIView

def get_users_data():
    queryset = User.objects.only('first_name', 'last_name', 'created_at', 'email', 'gender', 'date_of_birth') 
    fields = ['first_name', 'last_name', 'created_at', 'email', 'gender', 'date_of_birth']
    titles = ['First Name', 'Last Name', 'Date Added', 'Email', 'Gender', 'Date of Birth']
    file_name = 'users'
    return queryset, fields, titles, file_name

class UsersExportAsCSV(APIView):
    def get(self, request):
        users = get_users_data()
        data = export_to_csv(queryset=users[0], fields=users[1], titles=users[2], file_name=users[3])
        return data

utils.py

def export_to_csv(queryset, fields, titles, file_name):
    """
    will export the model data in the form of csv file
    :param queryset: queryset that need to be exported as csv
    :param fields: fields of a model that will be included in csv
    :param titles: title for each cell of the csv record
    :param file_name: the exported csv file name
    :return:
    """
    model = queryset.model
    response = HttpResponse(content_type='text/csv')
    # force download
    response['Content-Disposition'] = 'attachment; filename={}.csv'.format(file_name)
    # the csv writer
    writer = csv.writer(response)
    if fields:
        headers = fields
        if titles:
            titles = titles
        else:
            titles = headers
    else:
        headers = []
        for field in model._meta.fields:
            headers.append(field.name)
        titles = headers

    # Writes the title for the file
    writer.writerow(titles)

    # write data rows
    for item in queryset:
        writer.writerow([nested_getattr(item, field) for field in headers])
    return response

随着数据的增长,请求-响应周期变得越来越重。为了防止此请求阻止其他请求,请选择[[线程处理与异步任务。关于优化请求-响应周期有什么好主意。

这里用户必须等到整个导出工作完成并下载完成。这里的理想结果是,每当用户访问URL时,立即响应“正在生成文件”并在后台生成并下载文件。

将非常感谢您提供有关此主题的任何帮助。

python django django-rest-framework async-await python-asyncio
3个回答
2
投票
我认为您可以用http://www.celeryproject.org/您可以使用“ shared_task”完成在后台生成csv文件的任务,如果已完成,则将其保存到表中(例如:DownloadFileModel)。然后您可以稍后下载。通过重定向到准备下载文件的DetailView(文件字段中值为空的DownloadFileModel记录)来响应当前视图,如果文件尚未准备就绪,只需提供说明即可等待文件准备好(您可以将文件分配给记录)。

1
投票
我认为,您可以使用celery运行异步任务,它将为您提供大量功能,以显示进度以及任务何时完成。

https://pypi.org/project/django-celery/ from celery import task def get_users_data(): queryset = list(User.objects.values_list('first_name', 'last_name', 'created_at', 'email', 'gender', 'date_of_birth')) fields = ['first_name', 'last_name', 'created_at', 'email', 'gender', 'date_of_birth'] titles = ['First Name', 'Last Name', 'Date Added', 'Email', 'Gender', 'Date of Birth'] file_name = 'users' return queryset, fields, titles, file_name @task def export_to_csv(queryset, fields, titles, file_name): """ will export the model data in the form of csv file :param queryset: queryset that need to be exported as csv :param fields: fields of a model that will be included in csv :param titles: title for each cell of the csv record :param file_name: the exported csv file name :return: """ model = queryset.model response = HttpResponse(content_type='text/csv') # force download response['Content-Disposition'] = 'attachment; filename={}.csv'.format(file_name) # the csv writer writer = csv.writer(response) if fields: headers = fields if titles: titles = titles else: titles = headers else: headers = [] for field in model._meta.fields: headers.append(field.name) titles = headers # Writes the title for the file writer.writerow(titles) # write data rows # here you can save the file at particular path for item in queryset: writer.writerow([nested_getattr(item, field) for field in headers]) return file_path class UsersExportAsCSV(APIView): def get(self, request): users = get_users_data() task_id = export_to_csv.delay(queryset=users[0], fields=users[1], titles=users[2], file_name=users[3]) return task_id

using the task id you can get the result of that
另一种方法是使用djnago-channel,它使用您无需为检查任务完成或不进行池请求而进行的套接字连接

1
投票
为了解决我的问题,我使用了python线程模块,并使用Redis缓存服务器存储了文件记录。

首先,

GET方法将检查导出的文件记录在缓存中是否可用。如果可用,则将URL发送到该文件作为响应。 POST方法将生成文件并将其写入media / temp,并将记录存储在缓存中。

在代码块中进行的更改很少,如下所述:

views.py

from rest_framework.views import APIView def get_users_data(): queryset = User.objects.only('first_name', 'last_name', 'created_at', 'email', 'gender', 'date_of_birth') fields = ['first_name', 'last_name', 'created_at', 'email', 'gender', 'date_of_birth'] titles = ['First Name', 'Last Name', 'Date Added', 'Email', 'Gender', 'Date of Birth'] return queryset, fields, titles, file_name class TripHistoryExportAsCSV(APIView): file_name = "users_all" file_extension = 'csv' def post(self, request): try: queryset = get_users_data()[0] fields = get_users_data()[1] titles = get_users_data()[2] x = threading.Thread(target=export_to_csv, args=(queryset, fields, titles, self.file_name)) x.start() return Response({ 'message': 'CSV file is generating' }) except EmptyResultSet: return Response({ 'message': 'Can not create CSV file' }, status=status.HTTP_200_OK) def get(self, request): data = check_export_data_in_cache(self.file_name, self.file_extension) if data: return Response({ 'url': data.get('report_url') }) else: return Response({ 'message': 'Generation of new files required' }, status=status.HTTP_204_NO_CONTENT)

utils.py

def nested_getattr(obj, attribute, split_rule='__'): """ This function is responsible for getting the nested record from the given obj parameter :param obj: whole item without splitting :param attribute: field after splitting :param split_rule: :return: """ split_attr = attribute.split(split_rule) for attr in split_attr: if not obj: break obj = getattr(obj, attr) return obj def export_to_csv(queryset, fields, titles, file_name): """ will export the model data in the form of csv file :param queryset: queryset that need to be exported as csv :param fields: fields of a model that will be included in csv :param titles: title for each cell of the csv record :param file_name: the exported csv file name :return: """ model = queryset.model import os from yatruadminbackend.settings import MEDIA_ROOT if fields: headers = fields if titles: titles = titles else: titles = headers else: headers = [] for field in model._meta.fields: headers.append(field.name) titles = headers with open(os.path.join(MEDIA_ROOT, f'temp/{file_name}.csv'), 'w', newline='') as file: # Writes the title for the file writer = csv.writer(file) writer.writerow(titles) # write data rows for item in queryset: writer.writerow([nested_getattr(item, field) for field in headers]) set_cache_for_export_file(file_name, 'csv') def set_cache_for_export_file(filename, extension): generated_date = timezone.now() export_file_name = f'{filename}_{extension}' record_in_cache = { 'key': export_file_name, 'report_url': f'{BACKEND_URL}media/temp/{filename}.csv', 'generated_on': generated_date } cache.set(export_file_name, record_in_cache, 300) print(cache.get(export_file_name)) def check_export_data_in_cache(file_name, file_extension): cache_key = f'{file_name}_{file_extension}' print(cache_key) if cache.get(cache_key): return cache.get(cache_key)
© www.soinside.com 2019 - 2024. All rights reserved.