我正在开发一个 Django 项目,其中使用 Celery 进行任务调度。我有一个应该每分钟运行一次的定期任务,但 Celery Beat 似乎没有执行此任务。这是我的设置:
任务.py:
from celery.schedules import crontab
from ..apps.scraper.scrapers.updates import get_updates
from .settings import app
app.conf.beat_schedule = {
"get_updates": {
"task": "..apps.scraper.scrapers.updates.get_updates",
"schedule": crontab(minute="*/1"), # Run every 1 minute
},
}
celery.py:
from __future__ import absolute_import, unicode_literals
import os
from celery import Celery
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "manga_project.settings")
app = Celery("manga_project")
app.config_from_object("django.conf:settings", namespace="CELERY")
app.autodiscover_tasks()
app.conf.broker_url = "redis://localhost:6379/0"
更新.py:
@app.task
def get_updates():
print("Looking for manga updates...")
mangas = Manga.objects.all()
for manga in mangas:
print("Checking ", manga.title, " ...")
scrape_chapters("str", manga.url)
我希望 get_updates 任务每分钟运行一次,但它没有发生。当我启动 Celery Beat 时,当我检查调试器时,它似乎没有执行此任务。
我尝试重新启动 celery 并添加一些错误处理,但任务只是没有被执行,所以我不知道如何处理这个问题。
如有任何建议,我们将不胜感激。
autodiscover_tasks
在包列表中搜索“tasks.py”模块,这是搜索的默认 related_name
。
由于您的任务位于
updates.py
因此将
app.autodiscover_tasks()
更改为 app.autodiscover_tasks(related_name='updates')
以检查更新.py
阅读autodiscover_tasks参数及其作用
如果您的任务文件不在同一相对位置,您可以查看此处 Celery 发现具有其他文件名的文件中的任务
当然,如果可行的话,您可以将
updates.py
更改为 tasks.py