一些Celery任务有效,另一些则是NotRegistered

问题描述 投票:9回答:3

我遵循the Celery Django教程,我在示例中看到的任务(add, mul)完美地为我工作。当我做res = add.delay(1,2); res.get()时,我得到了正确的答案。

但是当我尝试执行另一个任务*** NotRegistered: u'pipeline.tasks.sayhello'时,我得到了res = sayhello.delay('trex')

如果我做res = sayhello('trex')然后我只需输入res即可得到结果。但是通过这种方式,我在不使用Celery的情况下执行ornidarly函数。

只有当我在Django shell ./manage shell中运行它时,该任务才有效

>>> res = sayhello.delay('trex')
>>> res.get()
u'Hello trex'

所以,问题是我无法从sayhello执行pipeline/views.py任务。但我可以从那里执行任务addmul

这是为什么?如何从views.py正确运行任务?

错误已完整消息:

[2016-11-11 10:56:09,870: ERROR/MainProcess] Received unregistered task of type u'pipeline.tasks.sayhello'.
The message has been ignored and discarded.

Did you remember to import the module containing this task?
Or maybe you're using relative imports?

Please see
http://docs.celeryq.org/en/latest/internals/protocol.html
for more information.

The full contents of the message body was:
'[["tiger"], {}, {"chord": null, "callbacks": null, "errbacks": null, "chain": null}]' (84b)
Traceback (most recent call last):
  File "/home/trex/Development/Sirius/new/rocket/rocket-venv/local/lib/python2.7/site-packages/celery/worker/consumer/consumer.py", line 549, in on_task_received
    strategy = strategies[type_]
KeyError: u'pipeline.tasks.sayhello'

Django版

1.9.7

芹菜版:

celery==4.0.0
django-celery==3.1.17

Django项目目录树:

rocket
├── etl
│   ├── etl
│   │   ├── celery.py
│   │   ├── __init__.py
│   │   ├── settings
│   │   │   ├── base.py
│   │   │   ├── dev.py
│   │   │   ├── __init__.py
│   │   │   ├── production.py
│   │   │   └── test.py
│   │   ├── urls.py
│   │   ├── wsgi.py
│   ├── manage.py
│   ├── pipeline
│   │   ├── __init__.py
│   │   ├── models.py
│   │   ├── tasks.py
│   │   ├── tests.py
│   │   ├── urls.py
│   │   ├── views.py

ETL /管道/ views.py

from .tasks import *

def get_response(request):
    result = add.delay(1, 2)
    result.get()
    result = sayhello.delay('tiger')
    result.get()

ETL /管道/ tasks.py

from __future__ import absolute_import, unicode_literals
from celery import shared_task

@shared_task
def add(x, y):
    return x + y

@shared_task
def mul(x, y):
    return x * y

@shared_task
def sayhello(name):
    return "Hello %s" % name

我也试过这个:

from celery.decorators import task

@task(name="sayhello")
def sayhello(name):
    return "Hello {0}".format(name)

ETL / celery.py

from __future__ import absolute_import, unicode_literals
import os
from celery import Celery

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'etl.settings.base')
app = Celery('etl')
app.config_from_object('django.conf:settings', namespace='CELERY')
app.autodiscover_tasks()

@app.task(bind=True)
def debug_task(self):
    print('Request: {0!r}'.format(self.request))

ETL / __ init__py

from __future__ import absolute_import, unicode_literals
from .celery import app as celery_app
__all__ = ['celery_app']

ETL /设置/ base.py

...
CELERY_BROKER_URL = 'redis://localhost:6379'
CELERY_RESULT_BACKEND = 'redis://localhost:6379'
CELERY_ACCEPT_CONTENT = ['application/json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_TIMEZONE = 'Europe/London'
CELERY_IMPORTS = ('pipeline.tasks', )
python django celery django-celery
3个回答
5
投票

该错误是因为CELERY_IMPORTS设置无法正常进入您的etl / settings / base.py文件。所以我的建议是:

从中删除逗号

CELERY_IMPORTS = ('pipeline.tasks' , )

如果问题仍然存在,请运行以下命令:

celery -A pipeline.tasks worker --loglevel=DEBUG

还有一件事,你的tasks.py文件需要在一个Django应用程序(在settings.py中注册)才能导入。所以请检查这一点。谢谢。


0
投票

这可能有希望帮助某人。我修改了我的代码而忽略了重启芹菜工人。

Try restarting the celery workers


0
投票

相对导入和自动名称生成不能很好地结合在一起,因此如果您使用相对导入,则应明确设置名称。

例如,如果客户端将模块“myapp.tasks”导入为“.tasks”,并且工作人员将模块导入为“myapp.tasks”,则生成的名称将不匹配,并且工作人员将引发NotRegistered错误。

http://docs.celeryproject.org/en/latest/userguide/tasks.html#task-naming-relative-imports

© www.soinside.com 2019 - 2024. All rights reserved.