基于教程:https://www.merixstudio.com/blog/django-celery-beat/
celery.py文件代码
from __future__ import absolute_import, unicode_literals
import os
from celery import Celery
from Backend.settings import INSTALLED_APPS
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'app.settings')
app = Celery('proj')
app.config_from_object('django.conf:settings')
app.autodiscover_tasks(lambda: INSTALLED_APPS)
tasks.py文件代码
from celery.task import task
@task(name='task1')
def emailt():
print("email func invoked")
# code...
settings.py
from __future__ import absolute_import
import os
import djcelery
from celery.schedules import crontab
djcelery.setup_loader()
INSTALLED_APPS = [
'djcelery',
'django_celery_beat',
....
]
REDIS_HOST = 'localhost'
REDIS_PORT = '6379'
BROKER_URL = 'redis://' + REDIS_HOST + ':' + REDIS_PORT + '/0'
CELERY_RESULT_BACKEND = 'redis://' + REDIS_HOST + ':' + REDIS_PORT + '/0'
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_ACCEPT_CONTENT = ['application/json']
CELERY_TIMEZONE = 'Asia/Kolkata'
CELERYBEAT_SCHEDULER = 'djcelery.schedulers.DatabaseScheduler'
CELERYBEAT_SCHEDULE = {
'task1':
{
'task': 'proj.tasks.emailt',
'schedule': crontab(hour='*', minute='1', day_of_week='mon,tue,wed,thu,fri,sat,sun'),
}
}
在一个命令外壳中,redis服务器和django py manage.py runserver正在运行。在另一个shell上,celery命令如下运行:celery -A proj.tasks beat -l INFO --scheduler django_celery_beat.schedulers:DatabaseScheduler
日志文件表示celerybeat正在运行。
Configuration ->
. broker -> redis://localhost:6379/0
. loader -> djcelery.loaders.DjangoLoader
. scheduler -> django_celery_beat.schedulers.DatabaseScheduler
. logfile -> [stderr]@%INFO
. maxinterval -> 5.00 seconds (5s)
[*************: INFO/MainProcess] beat: Starting...
[*************: INFO/MainProcess] Writing entries...
[*************: INFO/MainProcess] DatabaseScheduler: Schedule changed.
[*************: INFO/MainProcess] Writing entries...
[*************: INFO/MainProcess] Writing entries...
但是,tasks.py中的emailt()函数仍然没有被调用。我无法找到celerybeat的问题。
[DatabaseScheduler是数据库调度程序的实现,不从CELERYBEAT_SCHEDULE
词典中接收任务
如果要使用这种类型的调度程序,则应通过django管理员或通过数据迁移/视图创建PeriodicTask
您可以通过创建CrontabSchedule并将其附加到PeriodicTask将crontab表示法与任务中的默认调度程序或DatabaseScheduler一起使用