Celery在包含其他文件名的文件中发现任务

问题描述 投票:8回答:4

我开始有这么多的芹菜任务,我想将它们分解成较小的文件,而不是让它们在一个大的tasks.py,但我没有设法让芹菜发现它们。

工作结构:

proj/
    app/
        tasks.py

结构我想工作:

proj/
    app/
        tasks/
            __init__.py
            task1.py
            task2.py
            ...

但这里芹菜找不到我的任务。我尝试设置CELERY_IMPORTS,但后来我必须指定每个文件,并且文件太多了。如果我将每个文件内容导入__init__.py,则相同。我希望自动发现所有任务,或至少任务目录中的文件中的所有任务。

我当然可以将每个任务目录中的所有文件列入CELERY_IMPORTS,但这看起来相当丑陋。

任何想法,如果这是可能的一个很好的方式?

python django celery
4个回答
3
投票

我已经放弃了找到一个很好的解决方案,而是编写了一个我调用并填充CELERY_IMPORTS的函数。这不好,但它确实有效。

这是未来参考的代码:

import os

def detect_tasks(project_root):
    tasks = []
    file_path = os.path.join(project_root, 'apps')
    for root, dirs, files in os.walk(file_path):
        for filename in files:
            if os.path.basename(root) == 'tasks':
                if filename != '__init__.py' and filename.endswith('.py'):
                    task = os.path.join(root, filename)\
                        .replace(os.path.dirname(project_root) + '/', '')\
                        .replace('/', '.')\
                        .replace('.py', '')
                    tasks.append(task)
    return tuple(tasks)

然后在设置中:

CELERY_IMPORTS = detect_tasks(project_root)

project_root会是这样的:

project_root = os.path.dirname(os.path.abspath(__file__))

3
投票
proj/
    app/
        tasks/
            __init__.py
            task1.py
            task2.py
            ...

如果您的文件结构如上,您可以执行以下操作

import os
from celery import Celery
from django.conf import settings

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'settings')

app = Celery('celery')
app.config_from_object('django.conf:settings', namespace='CELERY')

for app_name in settings.INSTALLED_APPS:
    if app_name.startswith('django'):
        continue
    for root, dirs, files in os.walk(app_name + '/tasks'):
        for file in files:
            if file.startswith('__') or file.endswith('.pyc') or not file.endswith('.py'):
                continue
            file = file[:-3]
            app.autodiscover_tasks([app_name + '.tasks'], related_name=file)

2
投票

celery默认搜索tasks.py的唯一原因是autodiscover_tasks的默认参数:

./loaders/base.py:def autodiscover_tasks(packages, related_name='tasks'):

如果使用文档建议的配置,则可以使用相应名称的非默认值为autodiscover_tasks调用您希望具有任务的不同文件名。例如,这是我们的celery.py:

from __future__ import absolute_import

import os

from celery import Celery

# set the default Django settings module for the 'celery' program.
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "settings")

from django.conf import settings

app = Celery('app')

# Using a string here means the worker will not have to
# pickle the object when using Windows.
app.config_from_object('django.conf:settings')
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS, related_name='tasks2')

0
投票

我尝试了@oloform的代码片段。它不适合我。我有一个非常简单的想法。因为芹菜会为任何任务寻找tasks.py。我在任何文件中定义任务,但我在tasks.py中添加它,如,

# tasks.py
from app.utilities.somename import upload_done
from project.celery import app
app.task(upload_done)

在芹菜原木中很容易看到,

 -------------- celery@######## v3.1.18 (Cipater)
---- **** -----
--- * ***  * -- ###############
-- * - **** ---
- ** ---------- [config]
- ** ---------- .> app:         project:0x2aae5c0
- ** ---------- .> transport:   django://localhost//
- ** ---------- .> results:     djcelery.backends.database:DatabaseBackend
- *** --- * --- .> concurrency: 4 (prefork)
-- ******* ----
--- ***** ----- [queues]
 -------------- .> celery           exchange=celery(direct) key=celery


[tasks]
  . app.utilities.somename.upload_done

这是肮脏的方式但是,它的工作原理。

© www.soinside.com 2019 - 2024. All rights reserved.