django-channels / celery:如何跟踪任务列表的进度?

问题描述 投票:2回答:1

我“成功地”从视图向客户端发送一条消息,其中包含一组任务(不是实际的芹菜组)的状态。问题是:这实际上忽略了是否所有任务都是实际执行的。我试图添加一个回调(task.apply_async(link=)),但这也没有帮助。

任务本身并不需要花费很多时间,但我真的希望能够在实际执行任务时增加计数器:

if 'selected' in request.GET:
        selected_as_list = request.GET.getlist('selected')
        print(selected_as_list)
        searches = list(set([s.strip() for s in selected_as_list if s.strip()]))
        task_group = [refresh_func.s(str(user_profile.id), search, dont_auto_add=True) for search in searches]

        for i,task in enumerate(task_group):
            task.apply_async()
            Group(str(request.user.id)).send({"text": json.dumps({"tasks_completed": i+1,
                                                                  "task_id": "fb_import",
                                                                  "completed": True if i == len(task_group) -1 else False,
"total": len(task_group)})})

所以我将代码移出视图,并进入实际调用要完成的操作的同一个块。虽然这意味着我现在传递了许多参数,但这解决了最初的问题。但它提出了另一个:索引为“1”的任务可以在索引为“3”的任务之后完成,这显然会错误地更新计数器。

可以做些什么来解决这个问题?

django celery django-channels
1个回答
1
投票

如何生成一个后台线程,定期检查生成的任务的状态(如果你知道任务的ID,你可以获得这些状态)?

这个线程应该在Django服务器上运行(而不是在Celery任务中),因为这可能是你的django-channel处于活动状态的地方:如果你在任务中调用Group(...).send,它可能无法访问它(特别是因为通常芹菜工作者运行单独的流程/机器)

假设您在视图的.GET实现中生成任务。也许你可以在那里收集任务ID(它们在那里产生)并定期检查它们在一个线程中的状态(所以你不要阻止.GET响应)。

假设您生成任务的视图如下所示:

class Test(generic.TemplateView):
    template_name = 'stack_092.html'

    def get(self, request, *args, **kwargs):
        logger.info("Yep")
        task_group = [foo_task.s(i) for i in range(5)]
        logger.info("Task signatures created: %s", task_group)

        task_ids = [task.apply_async().task_id for task in task_group]
        logger.info("Tasks launched")
        th = threading.Thread(target=verify_task_ids, args=('request.user.id', task_ids))
        th.start()
        logger.info("Thread started")
        return super(Test, self).get(request, *args, **kwargs)

像这样的东西可能是线程的verify_task_ids目标函数:

def verify_task_ids(channel_group_id, task_ids):
    previous_finished_task_ids = set()
    finished_task_ids = set()
    logger.info("Verifying %s task_ids", len(task_ids))
    while len(finished_task_ids) < len(task_ids):
        finished_task_ids = set()
        for task_id in task_ids:
            if AsyncResult(task_id).ready():
                finished_task_ids.add(task_id)
        if finished_task_ids != previous_finished_task_ids:
            logger.info("%s new finished tasks", 
                        len(finished_task_ids) - len(previous_finished_task_ids))
        previous_finished_task_ids = finished_task_ids

在这个例子中,channel_group_id参数只是一个纯硬编码的字符串"request.user.id"。在您的情况下,您应该用登录到服务器的用户的实际request.user.id替换它,因为这是您的组ID。

你会看到,当一个新任务完成后,我只显示一条日志消息:

if finished_task_ids != previous_finished_task_ids:
        logger.info("%s new finished tasks", 
                    len(finished_task_ids) - len(previous_finished_task_ids))

这里是你可能应该调用的logger.info函数

if finished_task_ids != previous_finished_task_ids:
    Group(
        str(channel_group_id)
    ).send(
        {
            "text": json.dumps({
                "tasks_completed": len(finished_task_ids),
                "task_id": "fb_import",
                "completed": len(finished_task_ids) == len(task_ids),
             })
         }
     )

我不太了解(呃...什么,而不是......关于django-channels)所以我不确定这个解决方案是否有效,但也许值得一试?

© www.soinside.com 2019 - 2024. All rights reserved.