使用 Celery ver.3.1.23,我尝试向 celerybeat 动态添加计划任务。我有一个 celery Worker 和一个 celerybeat 实例正在运行。
运行 task.delay() 触发标准 celery 任务可以正常工作。当我将计划的定期任务定义为配置中的设置时,celerybeat 会运行它。
但是我需要的是能够添加一个在运行时在指定的 crontab 运行的任务。将任务添加到持久调度程序后,celerybeat似乎没有检测到新添加的新任务。我可以看到 celery-schedule 文件确实有一个包含新任务的条目。
代码:
scheduler = PersistentScheduler(app=current_app, schedule_filename='celerybeat-schedule')
scheduler.add(name="adder",
task="app.tasks.add",
schedule=crontab(minute='*/1'),
args=(1,2))
scheduler.close()
当我跑步时:
print(scheduler.schedule)
我得到:
{'celery.backend_cleanup': <Entry: celery.backend_cleanup celery.backend_cleanup() <crontab: 0 4 * * * (m/h/d/dM/MY)>,
'adder': <Entry: adder app.tasks.add(1, 2) <crontab: */1 * * * * (m/h/d/dM/MY)>}
请注意,app.tasks.add 有
@celery.task
装饰器。
Celerybeat 将所有定期计划的任务存储在 PeriodicTask 表中。可以通过多种方式安排任务,包括 crontab、间隔等
为了动态添加计划任务,请创建所需类型的计划对象(下例中的
CrontabSchedule
),并将其传递到新的PeriodicTask
对象中。
from django_celery_beat.models import PeriodicTask, CrontabSchedule
# -- Inside the function you want to add task dynamically
schedule = CrontabSchedule.objects.create(minute='*/1')
PeriodicTask.objects.create(name='adder',
task='apps.task.add', crontab=schedule)
我建议您切换到 Celery Redbeat,而不是试图找到一个好的解决方法。
您可以通过启用自动重新加载来解决您的问题。
但是我不能 100% 确定它适用于您的配置文件,但如果位于 CELERY_IMPORTS 路径中,它应该适用。
Hoverer 请注意,此功能是实验性的,请勿在生产中使用。
如果你真的想要动态的 celerybeat 调度,你可以随时使用 另一个调度器,比如 django-celery 通过 django admin 来管理数据库上的定期任务。
我遇到了类似的问题,我想到的解决方案是预先定义一些通用的周期性任务(每1秒、每5分钟等),然后让它们从数据库获取要执行的函数列表。 每次您想要添加新任务时,只需在数据库中添加一个条目即可。