目前,我有一个用django运行的芹菜批次,如下所示:
Celery.py:
from __future__ import absolute_import, unicode_literals
import os
import celery
from celery import Celery
from celery.schedules import crontab
import django
load_dotenv(os.path.join(os.path.dirname(os.path.dirname(__file__)), '.env'))
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'base.settings')
django.setup()
app = Celery('base')
app.config_from_object('django.conf:settings', namespace='CELERY')
app.autodiscover_tasks()
@app.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
app.control.purge()
sender.add_periodic_task(30.0, check_loop.s())
recursion_function.delay() #need to use recursive because it need to wait for loop to finish(time can't be predict)
print("setup_periodic_tasks")
@app.task()
def check_loop():
.....
start = database start number
end = database end number
callling apis in a list from id=start to id=end
create objects
update database(start number = end, end number = end + 3)
....
@app.task()
def recursion_function(default_retry_delay=10):
.....
do some looping
....
#when finished, call itself again
recursion_function.apply_async(countdown=30)
我的目标是每当芹菜文件被编辑时,它将重新启动所有尚未执行的任务-删除排队的任务(我这样做是因为recursion_function
完成后将再次运行其自身,它是检查每个记录的工作数据库中的表格,因此我不必担心它会在中途停止)。
[check_loop
函数将调用具有分页的api以返回对象列表,我将通过表中的记录与之进行比较,如果匹配,则创建另一个模型的新自定义记录
我的问题是,当我清除所有消息时,当前正在运行的任务会中途停止还是继续运行?因为如果check_loop
函数在api列表中途停止循环,则它将再次运行循环,并且我将创建新的重复记录,我不想要
示例:
在check_loop()
的执行任务期间,它在中途创建了对象(在api列表中,从元素id = 2到id = 5),服务器重启->再次运行,现在check_loop()
从头开始运行(在api列表中,元素id == 2到id = 5),然后再次从该列表中创建对象(我不希望100%)
这是怎么运行的?我只需要确认
编辑:
https://docs.celeryproject.org/en/4.4.1/faq.html#how-do-i-purge-all-waiting-tasks
我添加了app.control.purge()
,因为当我重新启动时,recursion_function
在setup_periodic_tasks
中再次被调用,而来自recursion_function
的上一个recursion_function.apply_async(countdown=30)
也执行了,因此它自身也就倍增了
<< [是