我可以成功从我的主应用程序接收 celery 任务 - 但是,我的系统无法从另一个模块接收任务。我在远程 Ubuntu 服务器上使用 celery 的主管。
draft1
是我的主应用程序,post
是另一个模块(我无法从中接收任务的模块)。
draft1/__init__.py
#This will make sure the app is always imported when
#Django starts so that shared_task will use this app.
from .celery import app as celery_app
__all__ = ['celery_app']
draft1/celery.py
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'draft1.settings')
app = Celery("draft1", broker=CELERY_BROKER_URL, include=['post'])
app.config_from_object('django.conf:settings', namespace='CELERY')
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS) #post is in my installed apps
draft1/tasks.py
@periodic_task(run_every=timedelta(minutes=1))
def test_job():
from polls.models import Question
for i in Question.objects.all():
if i.question_text == "test":
i.question_text = "not_test"
i.save()
return HttpResponseRedirect('/')
@periodic_task(name='run_scheduled_jobs', run_every=timedelta(seconds=30))
def run_scheduled_jobs():
return True
post/tasks.py
@periodic_task(name='test_post', run_every=timedelta(seconds=30))
def test_post():
from .models import Post
for i in Post.objects.all():
if i.entered_category == "test":
i.entered_category = "not_test"
i.save()
return HttpResponseRedirect('/')
@periodic_task(name='post_jobs', run_every=timedelta(seconds=30)) # task name found! celery will do its job
def post_jobs():
# do whatever stuff you do
return True
设置.py
CELERYBEAT_SCHEDULE = {
'run_scheduled_jobs': {
'task': 'run_scheduled_jobs', # the same goes in the task name
'schedule': timedelta(seconds=45),
},
'test_job': {
'task': 'tasks.test_job',
'schedule': timedelta(minutes=3),
},
'post_jobs': {
'task': 'post.tasks.post_jobs', #i've also tried tasks.post_jobs
'schedule': timedelta(minutes=1),
},
'test_post': {
'task': 'post.tasks.test_post',
'schedule': timedelta(seconds=45),
}
}
这是我启动工作进程的命令:
celery -A draft1 worker -l info
芹菜节拍通过以下方式启动:
celery -A draft1 beat -l info --scheduler
。
来自
draft1/tasks.py
的任务是我收到的唯一任务:
这是为什么?
您传递的
celery -A
开关显式地仅为 draft1
应用程序创建一个工作线程。您需要为 post
应用程序设置一个单独的工作线程,并为其设置一个 celery.py
,以便您可以为 post
任务创建一个单独的 Celery 实例。如果您愿意(而且您可能会这样做),这些显然可以共享相同的消息代理。
您需要在 init.py 中写入您的模块:
from .celery import app as celery_app
__all__ = ("celery_app",)
然后在同一位置创建 celery.py:
import os
from celery import Celery
os.environ.setdefault('DJANGO_SETTINGS_MODULE','name_project.settings')
app = Celery("name_your_django_module")
app.config_from_object("django.conf:settings", namespace="CELERY")
app.autodiscover_tasks()
在 django settings.py 中:
CELERY_IMPORTS = [
'name_module.file_with_tasks']
CELERY_BEAT_SCHEDULE = {
"name_tasks": {
"task": "name_module.file_with_tasks.name_task_func",
'schedule': timedelta(seconds=10),
},}