Python / Celery:杀死父任务时如何杀死子任务?

问题描述 投票:3回答:2

语境

我创建了一个Django应用程序,该应用程序正在调用celery任务,该任务又会生成其他任务并等待它们完成。

这是工作流程:

1)python / django的主要代码在后台启动芹菜任务

2)芹菜任务处理一些代码,然后启动一组芹菜任务,等待它们准备就绪

3)该组中的每个任务然后以相同的方式生成另一组子任务并等待它们完成

[它很好用(尽管我是一个乞讨者,可能执行起来很差),但是现在,如果我终止了以乞讨开始的主要芹菜任务,现在我希望能够终止每个子进程。

到目前为止我有什么

我使用一个简单的父任务重生了多个子任务来重新创建情况,并且我修改了celery Task类的“ on_failure”方法以在失败时杀死它的孩子。

Tasks.py

from celery import Celery, group,Task, result
from celery.signals import task_revoked
import time
from pprint import pprint
application = Celery('tasks',backend='amqp://',broker='amqp://guest@localhost//')


class MyTask(Task):
    def on_failure(self, exc, task_id, args, kwargs, einfo):
        print(self.AsyncResult(task_id).children[0].revoke(terminate=True,signal='SIGTERM'))
        print('{0!r} failed: {1!r}'.format(task_id, exc))

@application.task(base=MyTask)
def childTask():
    while True:
        time.sleep(10)
        print("Message de la tache enfant")
        continue

@application.task(base=MyTask)
def parentTask(pra_id = None):
    child_tasks = []
    print("Lancement tache mère")
    child_tasks.append(childTask.s())
    child_tasks.append(childTask.s())
    child_tasks.append(childTask.s())
    tasks = group(child_tasks)
    tasks.apply_async()

    time.sleep(15)
    raise KeyError

main.py

from tasks import parentTask

parent1 = parentTask.delay(pra_id = 10)
parent2 = parentTask.delay(pra_id = 20)

[当代码引发错误时,父任务被成功杀死,其子任务也被成功杀死,这就是我想要的。

我需要什么

我需要能够从django应用程序中手动终止父任务。

这是通过检查celery工人并通过搜索其参数找到我的任务而完成的,此操作已成功完成,但是,当我手动找到该celery任务后,它不会终止由该任务产生的子任务这就是我所需要的。

到目前为止我尝试过的事情

我已尝试创建由“吊销”信号触发的功能

http://docs.celeryproject.org/en/latest/userguide/signals.html#task-revoked

将在撤销任务时执行。

捕获信号有效(我可以在撤消任务时执行一些代码),但无法使用与上述“ On_failure”方法相同的代码来杀死孩子的任务。

问题

发送到该函数的Request对象的确包含我的父Task,但是当该类的“ children”属性应包含一个包含子任务的GroupResult对象时,该属性为空。

python django celery multiprocess
2个回答
0
投票

不确定这是否对您有帮助,但是我发现可以正常工作的是将每个子任务ID存储在Redis或创建时的某些数据库中,并将它们与pipeline_id关联。然后,如果需要终止父任务,那么也可以终止列表中存储的所有子任务。

result.revoke(terminate=True)

subtask_results = get_subtask_status(pipeline_id) #Custom Function

for subtask_result in subtask_results:
    subtask_result.revoke(terminate=True)

0
投票

默认情况下,Celery Task对象的值为trail = True,这意味着它将存储其子级。因此,您可以使用request.children或(Async)Result的children属性来获取它。一旦有了子task_id的列表,撤销这些任务就很简单了。

© www.soinside.com 2019 - 2024. All rights reserved.