如何设置Celerybeat和Flask的定期任务,每小时查询一次数据库?
环境看起来像这样:
/
|-app
|-__init__.py
|-jobs
|-task.py
|-celery-beat.sh
|-celery-worker.sh
|-manage.py
我目前有一个名为run_query()
的查询函数位于task.py
我希望调度程序在应用程序启动后启动,所以我在/app/__init__.py
文件夹中有以下行:
celery = Celery()
@celery.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
sender.add_periodic_task(1, app.jobs.task.run_query())
(为简单起见,我已将其设置为如果它运行,它将每分钟运行。还没有运气。)
当我发射celery-worker.sh
时,它识别我在[tasks]
标题下的功能。但预定的功能永远不会运行。我可以通过在命令提示符处发出以下命令来手动强制运行该函数:
>> from app.jobs import task
>> task.run_query.delay()
编辑:添加了celerybeat.sh
作为后续:如果通过烧瓶上下文访问数据库,在我的异步函数调用期间创建一个新的烧瓶上下文来访问数据库是明智的吗?使用现有的烧瓶背景?或者完全忘记上下文并只是启动与数据库的连接?我担心的是,如果我只是发起新连接,它可能会干扰现有的上下文连接?
要运行定期任务,您需要某种调度程序(例如芹菜节拍)。
芹菜拍是一种调度者;它定期启动任务,然后由群集中的可用工作节点执行。
您必须确保一次只有一个调度程序正在运行,否则您最终会遇到重复的任务。使用集中式方法意味着不必同步调度,并且服务可以在不使用锁的情况下运行。
您可以使用命令调用scheduler,
$ celery -A proj beat #different process from your worker
您还可以通过启用workers -B选项在工作者中嵌入节拍,如果您永远不会运行多个工作节点,这很方便,但它并不常用,因此不建议用于生产用途启动调度程序:
$ celery -A proj worker -B