我对芹菜有新手经验。我已经写了许多任务,既有计划任务,也有一份延迟工作,但这就是事实。
我遇到了一个问题,我想创建一个任务以启动数千个较小的作业,以减轻队列长度和可能需要数小时才能完成的作业可能引起的任何问题。
当前应用程序依赖于来自外部API的信息。可以这么说,一个用户将其帐户与我集成的另一项服务关联起来,并且我想每天通过其外部帐户的更改来更新该用户的信息。
我有这样的预定工作
@app.task()
def refresh_accounts():
for account in Account.objects.all():
response = retrieve_account_info(account_id=account.id)
account.data = response.data
account.save()
-
我想要的是这样的东西
@app.task()
def kickoff_refresh():
for account in Account.objects.all()
refresh_account.delay(account_id=account.id)
@app.task()
def refresh_account(account_id=None):
account = Account.objects.get(id=account_id)
response = retrieve_account_info(account_id=account.id)
account.data = response.data
account.save()
我正在考虑的一种方法是将kickoff_refresh
和refresh_account
放在不同的队列中。 @app.task(queue=q1)
,@app.task(queue=q2)
...但是,我不知道是否有更好的方法。在芹菜上,在同一队列中的任务中调用任务似乎是不明智的做法-https://docs.celeryproject.org/en/latest/userguide/tasks.html#avoid-launching-synchronous-subtasks任务kickoff_refresh
是每隔几个小时运行一次的定期任务。
我很想听听对别人有用的东西。谢谢
from celery import group
@app.task()
def kickoff_refresh(account_id=None):
job = group(refresh_account.s(account_id=account.id) for account in Account.objects.all())()
@app.task()
def refresh_account(account_id=None):
account = Account.objects.get(id=account_id)
response = retrieve_account_info(account_id=account.id)
account.data = response.data
account.save()