Celery Worker 找不到任务

问题描述 投票:0回答:1

我将worker与beats结合使用,beats发现一切都很完美,但是一旦beat将任务传递给worker,就会弹出错误:KeyError: 'tasks.cache_popular_posts' 所有信息:

Did you remember to import the module containing this task?
Or maybe you're using relative imports?

Please see
https://docs.celeryq.dev/en/latest/internals/protocol.html
for more information.

The full contents of the message body was:
b'[[], {}, {"callbacks": null, "errbacks": null, "chain": null, "chord": null}]' (77b)

The full contents of the message headers:
{'lang': 'py', 'task': 'tasks.cache_popular_posts', 'id': 'bdaa8c59-8b84-4f79-af1c-7fe34468894d', 'shadow': None, 'eta': None, 'expires': None, 'group': None, 'group_index': None, 'retries': 0, 'timelimit': [None, None], 'root_id': 'bdaa8c59-8b84-4f79-af1c-7fe34468894d', 'parent_id': None, 'argsrepr': '()', 'kwargsrepr': '{}', 'origin': 'gen3644@WIN-7483UVBHTO4', 'ignore_result': False, 'replaced_task_nesting': 0, 'stamped_headers': None, 'stamps': {}}

The delivery info for this task is:
{'exchange': '', 'routing_key': 'celery'}
Traceback (most recent call last):
  File "C:\Users\User\VsCodeProjects\Pastebin_hardEdition\venv\lib\site-packages\celery\worker\consumer\consumer.py", line 658, in on_task_received
    strategy = strategies[type_]
KeyError: 'tasks.cache_popular_posts'

此任务#posts/tasks.py

@app.task
def cache_popular_posts():
try:
    popular_posts = Post.objects.filter(views__gt=10)
    for post in popular_posts:
        hash = post.postmeta.hash_id
        cache.add(key=hash, value=post)
except Exception as ex:
    return ex

#app/celery.py

import os
from celery import Celery


os.environ.setdefault("DJANGO_SETTINGS_MODULE", "Pastebin_hardEdition.settings")

app = Celery("Pastebin_hardEdition")
app.config_from_object("django.conf:settings", namespace="CELERY")
app.autodiscover_tasks()


#beats
app.conf.beat_schedule = {
    'cache-popular-post&Meta-every-30-seconds': {
        'task': 'tasks.cache_popular_posts',
        'schedule': 30.0,
    },
}
app.conf.timezone = 'UTC'

代理已配置并连接 #app/init.py

from .celery import app as celery_app

__all__ = ("celery_app",)

#posts/apps.py

class PostsConfig(AppConfig):
default_auto_field = 'django.db.models.BigAutoField'
name = 'posts'

def ready(self):
    import posts.signals
    from . import tasks

我真的需要你的帮助

python django celery
1个回答
0
投票

我在您的代码中找不到任何问题。我对 Windows 的评论是基于我自己的经验和信息。

当我尝试在 Windows 上使用 django 设置 celery 时,无论我做什么,我都无法让它工作(我尝试了 100 多种可能的方法和指南)。

官方文档说他们不支持Windows。但他们支持Linux。

© www.soinside.com 2019 - 2024. All rights reserved.