在执行任务期间从Celery工作人员捕获失败的任务

问题描述 投票:2回答:1

我当前在celery中使用delay method(与RabbitMQ)执行异步任务。这些任务分配给8个工作人员,在这些任务中,我正在执行数据库插入操作。

有时,某些任务失败并且由于超时(或)Json解码错误而导致数据库插入未发生,我想捕获那些特定任务失败了。以下是我在Django Project中使用的Celery代码段。

views.py

def celery_url_filtering(request):
    for each_data in dataset:
        #each_data is a Json object
        res = result.delay(each_data)
    while(res.status == 'PENDING'):
         pass
    return JsonResponse({'ok':'Success'})

tasks.py

@app.task
def result(dataeach_data:
    # Parse each_data  and do data insertion here
    return "Something"

[请提出解决方法以捕获列表中失败的任务。

python-3.x asynchronous rabbitmq celery django-celery
1个回答
0
投票

从您的担心中我可以理解的是,您想研究“ PENDING”或“ FAILURE”任务,然后重试/应用某些应用程序逻辑。

如果是这样,您可以根据需要按固定的时间表(每天/每小时等)运行cron。此cron作业可以根据您的计划捕获过去一天/小时内失败的任务。

您可以使用django-celery-beat设置cron作业,并使用django-celery-results使用Django ORM存储celery任务结果。

例如,您可以执行像这样的芹菜任务

from celery import shared_task
from django_celery_results.models import TaskResult

**tasks.py**
@shared_task(name="failed_task_cron", bind=True)
def failed_task_cron(self, **kwargs):
    """
    Celery task to run on daily schedule to do something with the failed tasks
    """
    tasks = TaskResult.objects.filter(status='FAILURE')
    # tasks is a queryset of all the failed tasks
    # Perform application logic

您可以在芹菜设置中为上述任务设置cron

from celery.schedules import crontab

# ...

CELERY_BEAT_SCHEDULE = {
    "failed_task_cron": {
        "task": "path/to/tasks.failed_task_cron",
        "schedule": crontab(hour=1)           # Runs every hour
    }
}
© www.soinside.com 2019 - 2024. All rights reserved.