我在将函数传递给 celery 任务时遇到一些麻烦,并且无法传递其结果,因为加载需要很长时间并且会导致 api 请求超时
@celery.task(name='task_name', bind=True)
def celery_task(self, headers, data_function):
data = data_function()
# more code
class DataClass(generics.ListCreateAPIView):
def get(self, request, *args, **kwargs):
send_excel_as_email.apply_async(args=[headers, self.get_data])
def get_data(self):
queryset = self.filter_queryset(self.get_queryset())
serializer_class = self.get_serializer_class()
data = serializer_class(queryset, context=self.get_serializer_context(), many=True).data
# more code
所以基本上 celery_task 是一个可重用的任务,将在许多其他类中使用,每个类都有自己的 query_set 和序列化器,这就是为什么我需要找到一种方法来调用 celery 任务本身内部的
get_data
函数
我尝试使用pickl,但它抛出了无法序列化函数的错误