如何在 5.x 中使用自定义名称注册基于 celery 类的任务?

问题描述 投票:0回答:1

架构:Flask 应用程序在自己的容器中运行,Celery 在自己的容器中运行。消息代理是Redis。顺便说一句,这都是Python。

问题:我不知道如何使用自定义名称注册基于类的任务。

版本:

celery==5.2.7
redis==5.0.3

详细信息:我想构建基于类的任务,因为我希望能够在每个任务上使用

on_success
处理程序。请注意,所有基于类的任务都是在 celery 容器中定义的,因此 Flask 应用程序无法知道它们是什么。 Flask 应用程序必须能够通过“名称”来调用它们。我尝试使用默认名称,但我无法使用基于 celery 的类,它需要一个名称。我还尝试配置自定义名称。每次我尝试这样做时都会收到此错误

[2024-03-12 22:53:01,292: CRITICAL/MainProcess] Unrecoverable error: AttributeError("'NoneType' object has no attribute 'push'")
Traceback (most recent call last):
  File "/usr/local/lib/python3.9/site-packages/celery/worker/worker.py", line 203, in start
    self.blueprint.start(self)
  File "/usr/local/lib/python3.9/site-packages/celery/bootsteps.py", line 116, in start
    step.start(parent)
  File "/usr/local/lib/python3.9/site-packages/celery/bootsteps.py", line 365, in start
    return self.obj.start()
  File "/usr/local/lib/python3.9/site-packages/celery/worker/consumer/consumer.py", line 332, in start
    blueprint.start(self)
  File "/usr/local/lib/python3.9/site-packages/celery/bootsteps.py", line 116, in start
    step.start(parent)
  File "/usr/local/lib/python3.9/site-packages/celery/worker/consumer/tasks.py", line 26, in start
    c.update_strategies()
  File "/usr/local/lib/python3.9/site-packages/celery/worker/consumer/consumer.py", line 562, in update_strategies
    task.__trace__ = build_tracer(name, task, loader, self.hostname,
  File "/usr/local/lib/python3.9/site-packages/celery/app/trace.py", line 361, in build_tracer
    push_request = request_stack.push
AttributeError: 'NoneType' object has no attribute 'push'

我真的只是想找到一种方法让 celery 任务在完成任务后执行操作。

on_success
似乎是正确的做法,但我不知道如何在基于类的任务之外使用它,并且我无法让基于类的任务工作。有人可以帮忙吗?

class celery task naming
1个回答
0
投票

我放弃了尝试使用基于班级的任务。不知道如何让它发挥作用。 我最终使用 task_success 在任务完成后执行操作。

@task_success.connect
def task_success_handler(sender=None, result=None, **kwargs):
    if sender.name == 'customer':
        # Extract result dictionary
        notification_url = result['notification_url']
        job_id = result['job_id']
        job_status = result['status']
        task_id = sender.request.id
        task_state = celery.AsyncResult(task_id).state

        logger.info(f"Task {task_id}: {task_state}, job_id: {job_id}: {job_status}")

        # send async response
        send_async_response(db, notification_url, task_id, job_id, "200", job_status, task_state, f"Task succeeded for customer.")



@celery.task(name='customer')
def backend(jobs, job_id, backend_config, method, auth_user, notification_url, sleep_time=0):
    logger.info(f"sleeping for {str(sleep_time)} sec")
    time.sleep(int(sleep_time))
    outcome = customer_work(jobs, job_id, backend_config, method, auth_user, notification_url)
    return outcome

© www.soinside.com 2019 - 2024. All rights reserved.