FastApi:Celery Worker 未执行任务并且任务停留在 Unack 中

问题描述 投票:0回答:0

我给自己写了一个测试应用程序来掌握 Celery 和 RabbitMQ 的窍门。 我的所有消息似乎都停留在 RabbitMQ 上的“Unack”状态...... 该任务应该由工作人员执行,结果必须存储在我的本地 MongoDb 中(我不确定到目前为止是否设置正确),我还希望所有消息都通过名为“tasks”的特定队列移动”。我必须能够调用“check_task_status”端点,并且在调用“execute_task”大约 20 秒后,“check_task_status”端点仍然具有“PENDING”输出。我无法指出这个问题。

我启动了我的worker:

celery -A tasks worker -Q tasks --loglevel=INFO

快速说明:所以最初我离开了我的工作人员,没有任何错误,但等待了一段时间(让我的工作人员保持运行)后,我在工作人员运行的终端中收到了一条新的“超时”错误消息。这是我的终端输出:

[2023-08-02 09:21:53,043: INFO/MainProcess] mingle: all alone
[2023-08-02 09:21:53,073: INFO/MainProcess] celery@DESKTOP-490VADQ ready.
[2023-08-02 09:22:15,655: INFO/MainProcess] Task tasks.sample_task[9c586484-102d-47a7-859a-693026f5ed29] received
[2023-08-02 09:22:16,612: INFO/SpawnPoolWorker-9] child process 5936 calling self.run()
[2023-08-02 09:22:16,621: INFO/SpawnPoolWorker-10] child process 29216 calling self.run()
[2023-08-02 09:22:16,636: INFO/SpawnPoolWorker-11] child process 10352 calling self.run()
[2023-08-02 09:22:17,435: INFO/SpawnPoolWorker-12] child process 10776 calling self.run()
[2023-08-02 09:22:18,235: INFO/SpawnPoolWorker-13] child process 14540 calling self.run()
[2023-08-02 09:29:50,785: INFO/MainProcess] Task tasks.sample_task[7451e7e0-3a57-4e3b-9481-7ef4895ab1b4] received
[2023-08-02 09:50:52,381: INFO/MainProcess] Task tasks.sample_task[2b54a897-b59b-4268-b9da-f7ad85f022ef] received
[2023-08-02 09:51:34,184: INFO/MainProcess] Task tasks.sample_task[b265be88-ff2d-4865-884a-5b0bc5e2b58b] received
[2023-08-02 09:52:15,686: CRITICAL/MainProcess] Unrecoverable error: PreconditionFailed(406, 'PRECONDITION_FAILED - delivery acknowledgement on channel 1 timed out. Timeout value used: 1800000 ms. This timeout value can be configured, see consumers doc guide to learn more', (0, 0), '')
Traceback (most recent call last):
  File "C:\Users\Emile\Desktop\Projects\emile-test-2\.venv\lib\site-packages\celery\worker\worker.py", line 202, in start
    self.blueprint.start(self)
  File "C:\Users\Emile\Desktop\Projects\emile-test-2\.venv\lib\site-packages\celery\bootsteps.py", line 116, in start
    step.start(parent)
  File "C:\Users\Emile\Desktop\Projects\emile-test-2\.venv\lib\site-packages\celery\bootsteps.py", line 365, in start
    return self.obj.start()
  File "C:\Users\Emile\Desktop\Projects\emile-test-2\.venv\lib\site-packages\celery\worker\consumer\consumer.py", line 336, in start
    blueprint.start(self)
  File "C:\Users\Emile\Desktop\Projects\emile-test-2\.venv\lib\site-packages\celery\bootsteps.py", line 116, in start
    step.start(parent)
  File "C:\Users\Emile\Desktop\Projects\emile-test-2\.venv\lib\site-packages\celery\worker\consumer\consumer.py", line 726, in start
    c.loop(*c.loop_args())
  File "C:\Users\Emile\Desktop\Projects\emile-test-2\.venv\lib\site-packages\celery\worker\loops.py", line 130, in 
synloop
    connection.drain_events(timeout=2.0)
  File "C:\Users\Emile\Desktop\Projects\emile-test-2\.venv\lib\site-packages\kombu\connection.py", line 341, in drain_events
    return self.transport.drain_events(self.connection, **kwargs)
  File "C:\Users\Emile\Desktop\Projects\emile-test-2\.venv\lib\site-packages\kombu\transport\pyamqp.py", line 171, 
in drain_events
    return connection.drain_events(**kwargs)
  File "C:\Users\Emile\Desktop\Projects\emile-test-2\.venv\lib\site-packages\amqp\connection.py", line 525, in drain_events
    while not self.blocking_read(timeout):
  File "C:\Users\Emile\Desktop\Projects\emile-test-2\.venv\lib\site-packages\amqp\connection.py", line 531, in blocking_read
    return self.on_inbound_frame(frame)
  File "C:\Users\Emile\Desktop\Projects\emile-test-2\.venv\lib\site-packages\amqp\method_framing.py", line 53, in on_frame
    callback(channel, method_sig, buf, None)
  File "C:\Users\Emile\Desktop\Projects\emile-test-2\.venv\lib\site-packages\amqp\connection.py", line 537, in on_inbound_method
    return self.channels[channel_id].dispatch_method(
  File "C:\Users\Emile\Desktop\Projects\emile-test-2\.venv\lib\site-packages\amqp\abstract_channel.py", line 156, in dispatch_method
    listener(*args)
  File "C:\Users\Emile\Desktop\Projects\emile-test-2\.venv\lib\site-packages\amqp\channel.py", line 293, in _on_close
    raise error_for_code(
amqp.exceptions.PreconditionFailed: (0, 0): (406) PRECONDITION_FAILED - delivery acknowledgement on channel 1 timed out. Timeout value used: 1800000 ms. This timeout value can be configured, see consumers doc guide to learn more  

这是我在python项目中的代码:

main.py:

from fastapi import FastAPI
from tasks import sample_task, celery

app = FastAPI()

@app.get("/test")
def execute_task():
    task = sample_task.apply_async(queue='tasks')  # Send the task to the worker
    return {'task_id': task.id}

@app.get('/check_task_status/{task_id}')
def check_task_status(task_id: str):
    task_result = celery.AsyncResult(task_id)
    if task_result.state == 'PENDING':
        status = 'Task is pending'
    elif task_result.state == 'SUCCESS':
        status = 'Task completed successfully'
    elif task_result.state == 'FAILURE':
        status = 'Task failed'
    else:
        status = 'Task is in progress'
    return {'task_id': task_id, 'status': status}

任务.py:

from celery import Celery
import time

celery = Celery('tasks', broker='amqp://guest:guest@localhost:5672//', backend='rpc://')
celery.config_from_object('celeryconfig')

@celery.task(queue='tasks', max_retries=5, retry_backoff=True, retry_backoff_max=1)
def sample_task():
    for i in range(4):
        time.sleep(5)
    print("Task Completed")

cerleryconfig.py:

broker_url = 'pyamqp://'
result_backend = 'mongodb://localhost:27017/'
imports = ('tasks')
mongodb_backend_settings = {
    'database': 'capetown',
    'taskmeta_collection': 'tasks-info',
}
task_serializer = 'json'
result_serializer = 'json'
accept_content = ['json']
task_routes = {
    'tasks.add': 'low-priority',
}

如有任何帮助,我们将不胜感激。 祝你有美好的一天。

python mongodb rabbitmq celery
© www.soinside.com 2019 - 2024. All rights reserved.