我当前在celery中使用delay method(与RabbitMQ)执行异步任务。这些任务分配给8个工作人员,在这些任务中,我正在执行数据库插入操作。
有时,某些任务失败并且由于超时(或)Json解码错误而导致数据库插入未发生,我想捕获那些特定任务失败了。以下是我在Django Project中使用的Celery代码段。
views.py
def celery_url_filtering(request):
for each_data in dataset:
#each_data is a Json object
res = result.delay(each_data)
while(res.status == 'PENDING'):
pass
return JsonResponse({'ok':'Success'})
tasks.py
@app.task
def result(dataeach_data:
# Parse each_data and do data insertion here
return "Something"
[请提出解决方法以捕获列表中失败的任务。
从您的担心中我可以理解的是,您想研究“ PENDING”或“ FAILURE”任务,然后重试/应用某些应用程序逻辑。
如果是这样,您可以根据需要按固定的时间表(每天/每小时等)运行cron。此cron作业可以根据您的计划捕获过去一天/小时内失败的任务。
您可以使用django-celery-beat设置cron作业,并使用django-celery-results使用Django ORM存储celery任务结果。
例如,您可以执行像这样的芹菜任务
from celery import shared_task
from django_celery_results.models import TaskResult
**tasks.py**
@shared_task(name="failed_task_cron", bind=True)
def failed_task_cron(self, **kwargs):
"""
Celery task to run on daily schedule to do something with the failed tasks
"""
tasks = TaskResult.objects.filter(status='FAILURE')
# tasks is a queryset of all the failed tasks
# Perform application logic
您可以在芹菜设置中为上述任务设置cron
from celery.schedules import crontab
# ...
CELERY_BEAT_SCHEDULE = {
"failed_task_cron": {
"task": "path/to/tasks.failed_task_cron",
"schedule": crontab(hour=1) # Runs every hour
}
}