架构:Flask 应用程序在自己的容器中运行,Celery 在自己的容器中运行。消息代理是Redis。顺便说一句,这都是Python。
问题:我不知道如何使用自定义名称注册基于类的任务。
版本:
celery==5.2.7
redis==5.0.3
详细信息:我想构建基于类的任务,因为我希望能够在每个任务上使用
on_success
处理程序。请注意,所有基于类的任务都是在 celery 容器中定义的,因此 Flask 应用程序无法知道它们是什么。 Flask 应用程序必须能够通过“名称”来调用它们。我尝试使用默认名称,但我无法使用基于 celery 的类,它需要一个名称。我还尝试配置自定义名称。每次我尝试这样做时都会收到此错误
[2024-03-12 22:53:01,292: CRITICAL/MainProcess] Unrecoverable error: AttributeError("'NoneType' object has no attribute 'push'")
Traceback (most recent call last):
File "/usr/local/lib/python3.9/site-packages/celery/worker/worker.py", line 203, in start
self.blueprint.start(self)
File "/usr/local/lib/python3.9/site-packages/celery/bootsteps.py", line 116, in start
step.start(parent)
File "/usr/local/lib/python3.9/site-packages/celery/bootsteps.py", line 365, in start
return self.obj.start()
File "/usr/local/lib/python3.9/site-packages/celery/worker/consumer/consumer.py", line 332, in start
blueprint.start(self)
File "/usr/local/lib/python3.9/site-packages/celery/bootsteps.py", line 116, in start
step.start(parent)
File "/usr/local/lib/python3.9/site-packages/celery/worker/consumer/tasks.py", line 26, in start
c.update_strategies()
File "/usr/local/lib/python3.9/site-packages/celery/worker/consumer/consumer.py", line 562, in update_strategies
task.__trace__ = build_tracer(name, task, loader, self.hostname,
File "/usr/local/lib/python3.9/site-packages/celery/app/trace.py", line 361, in build_tracer
push_request = request_stack.push
AttributeError: 'NoneType' object has no attribute 'push'
我真的只是想找到一种方法让 celery 任务在完成任务后执行操作。
on_success
似乎是正确的做法,但我不知道如何在基于类的任务之外使用它,并且我无法让基于类的任务工作。有人可以帮忙吗?
我放弃了尝试使用基于班级的任务。不知道如何让它发挥作用。 我最终使用 task_success 在任务完成后执行操作。
@task_success.connect
def task_success_handler(sender=None, result=None, **kwargs):
if sender.name == 'customer':
# Extract result dictionary
notification_url = result['notification_url']
job_id = result['job_id']
job_status = result['status']
task_id = sender.request.id
task_state = celery.AsyncResult(task_id).state
logger.info(f"Task {task_id}: {task_state}, job_id: {job_id}: {job_status}")
# send async response
send_async_response(db, notification_url, task_id, job_id, "200", job_status, task_state, f"Task succeeded for customer.")
@celery.task(name='customer')
def backend(jobs, job_id, backend_config, method, auth_user, notification_url, sleep_time=0):
logger.info(f"sleeping for {str(sleep_time)} sec")
time.sleep(int(sleep_time))
outcome = customer_work(jobs, job_id, backend_config, method, auth_user, notification_url)
return outcome