如何从另一个模块接收celery任务

问题描述 投票:0回答:2

我可以成功从我的主应用程序接收 celery 任务 - 但是,我的系统无法从另一个模块接收任务。我在远程 Ubuntu 服务器上使用 celery 的主管。

draft1
是我的主应用程序,
post
是另一个模块(我无法从中接收任务的模块)。

draft1/__init__.py

#This will make sure the app is always imported when
#Django starts so that shared_task will use this app.
from .celery import app as celery_app

__all__ = ['celery_app']

draft1/celery.py

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'draft1.settings')

app = Celery("draft1", broker=CELERY_BROKER_URL, include=['post'])
app.config_from_object('django.conf:settings', namespace='CELERY')
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS) #post is in my installed apps

draft1/tasks.py

@periodic_task(run_every=timedelta(minutes=1))
def test_job():
    from polls.models import Question
    for i in Question.objects.all():
        if i.question_text == "test":
            i.question_text = "not_test"
            i.save()
    return HttpResponseRedirect('/')


@periodic_task(name='run_scheduled_jobs', run_every=timedelta(seconds=30))
def run_scheduled_jobs():
    return True

post/tasks.py

@periodic_task(name='test_post', run_every=timedelta(seconds=30))
def test_post():
    from .models import Post
    for i in Post.objects.all():
        if i.entered_category == "test":
            i.entered_category = "not_test"
            i.save()
    return HttpResponseRedirect('/')


@periodic_task(name='post_jobs', run_every=timedelta(seconds=30)) # task name found! celery will do its job
def post_jobs():
    # do whatever stuff you do
    return True

设置.py

CELERYBEAT_SCHEDULE = {
    'run_scheduled_jobs': {
        'task': 'run_scheduled_jobs', # the same goes in the task name
        'schedule': timedelta(seconds=45),
    },
    'test_job': {
            'task': 'tasks.test_job',
            'schedule': timedelta(minutes=3),
    },
    'post_jobs': {
        'task': 'post.tasks.post_jobs',  #i've also tried tasks.post_jobs
        'schedule': timedelta(minutes=1),
    },
    'test_post': {
        'task': 'post.tasks.test_post',
        'schedule': timedelta(seconds=45),
    }
}

这是我启动工作进程的命令:

celery -A draft1 worker -l info

芹菜节拍通过以下方式启动:

celery -A draft1 beat -l info --scheduler

来自

draft1/tasks.py
的任务是我收到的唯一任务:

这是为什么?

python django celery supervisord
2个回答
0
投票

您传递的

celery -A
开关显式地仅为
draft1
应用程序创建一个工作线程。您需要为
post
应用程序设置一个单独的工作线程,并为其设置一个
celery.py
,以便您可以为
post
任务创建一个单独的 Celery 实例。如果您愿意(而且您可能会这样做),这些显然可以共享相同的消息代理。


0
投票

您需要在 init.py 中写入您的模块:

from .celery import app as celery_app

__all__ = ("celery_app",)

然后在同一位置创建 celery.py:

import os
from celery import Celery

os.environ.setdefault('DJANGO_SETTINGS_MODULE','name_project.settings')
app = Celery("name_your_django_module")
app.config_from_object("django.conf:settings", namespace="CELERY")
app.autodiscover_tasks()

在 django settings.py 中:

CELERY_IMPORTS = [
'name_module.file_with_tasks']


CELERY_BEAT_SCHEDULE = {
"name_tasks": {
    "task": "name_module.file_with_tasks.name_task_func",
    'schedule': timedelta(seconds=10),
},}

在我的项目中,主 djanfo 模块名称 - Social_network,带有 celery 的新模块 - 任务 https://i.stack.imgur.com/yx1zZ.png

© www.soinside.com 2019 - 2024. All rights reserved.