我给自己写了一个测试应用程序来掌握 Celery 和 RabbitMQ 的窍门。 我的所有消息似乎都停留在 RabbitMQ 上的“Unack”状态...... 该任务应该由工作人员执行,结果必须存储在我的本地 MongoDb 中(我不确定到目前为止是否设置正确),我还希望所有消息都通过名为“tasks”的特定队列移动”。我必须能够调用“check_task_status”端点,并且在调用“execute_task”大约 20 秒后,“check_task_status”端点仍然具有“PENDING”输出。我无法指出这个问题。
我启动了我的worker:
celery -A tasks worker -Q tasks --loglevel=INFO
快速说明:所以最初我离开了我的工作人员,没有任何错误,但等待了一段时间(让我的工作人员保持运行)后,我在工作人员运行的终端中收到了一条新的“超时”错误消息。这是我的终端输出:
[2023-08-02 09:21:53,043: INFO/MainProcess] mingle: all alone
[2023-08-02 09:21:53,073: INFO/MainProcess] celery@DESKTOP-490VADQ ready.
[2023-08-02 09:22:15,655: INFO/MainProcess] Task tasks.sample_task[9c586484-102d-47a7-859a-693026f5ed29] received
[2023-08-02 09:22:16,612: INFO/SpawnPoolWorker-9] child process 5936 calling self.run()
[2023-08-02 09:22:16,621: INFO/SpawnPoolWorker-10] child process 29216 calling self.run()
[2023-08-02 09:22:16,636: INFO/SpawnPoolWorker-11] child process 10352 calling self.run()
[2023-08-02 09:22:17,435: INFO/SpawnPoolWorker-12] child process 10776 calling self.run()
[2023-08-02 09:22:18,235: INFO/SpawnPoolWorker-13] child process 14540 calling self.run()
[2023-08-02 09:29:50,785: INFO/MainProcess] Task tasks.sample_task[7451e7e0-3a57-4e3b-9481-7ef4895ab1b4] received
[2023-08-02 09:50:52,381: INFO/MainProcess] Task tasks.sample_task[2b54a897-b59b-4268-b9da-f7ad85f022ef] received
[2023-08-02 09:51:34,184: INFO/MainProcess] Task tasks.sample_task[b265be88-ff2d-4865-884a-5b0bc5e2b58b] received
[2023-08-02 09:52:15,686: CRITICAL/MainProcess] Unrecoverable error: PreconditionFailed(406, 'PRECONDITION_FAILED - delivery acknowledgement on channel 1 timed out. Timeout value used: 1800000 ms. This timeout value can be configured, see consumers doc guide to learn more', (0, 0), '')
Traceback (most recent call last):
File "C:\Users\Emile\Desktop\Projects\emile-test-2\.venv\lib\site-packages\celery\worker\worker.py", line 202, in start
self.blueprint.start(self)
File "C:\Users\Emile\Desktop\Projects\emile-test-2\.venv\lib\site-packages\celery\bootsteps.py", line 116, in start
step.start(parent)
File "C:\Users\Emile\Desktop\Projects\emile-test-2\.venv\lib\site-packages\celery\bootsteps.py", line 365, in start
return self.obj.start()
File "C:\Users\Emile\Desktop\Projects\emile-test-2\.venv\lib\site-packages\celery\worker\consumer\consumer.py", line 336, in start
blueprint.start(self)
File "C:\Users\Emile\Desktop\Projects\emile-test-2\.venv\lib\site-packages\celery\bootsteps.py", line 116, in start
step.start(parent)
File "C:\Users\Emile\Desktop\Projects\emile-test-2\.venv\lib\site-packages\celery\worker\consumer\consumer.py", line 726, in start
c.loop(*c.loop_args())
File "C:\Users\Emile\Desktop\Projects\emile-test-2\.venv\lib\site-packages\celery\worker\loops.py", line 130, in
synloop
connection.drain_events(timeout=2.0)
File "C:\Users\Emile\Desktop\Projects\emile-test-2\.venv\lib\site-packages\kombu\connection.py", line 341, in drain_events
return self.transport.drain_events(self.connection, **kwargs)
File "C:\Users\Emile\Desktop\Projects\emile-test-2\.venv\lib\site-packages\kombu\transport\pyamqp.py", line 171,
in drain_events
return connection.drain_events(**kwargs)
File "C:\Users\Emile\Desktop\Projects\emile-test-2\.venv\lib\site-packages\amqp\connection.py", line 525, in drain_events
while not self.blocking_read(timeout):
File "C:\Users\Emile\Desktop\Projects\emile-test-2\.venv\lib\site-packages\amqp\connection.py", line 531, in blocking_read
return self.on_inbound_frame(frame)
File "C:\Users\Emile\Desktop\Projects\emile-test-2\.venv\lib\site-packages\amqp\method_framing.py", line 53, in on_frame
callback(channel, method_sig, buf, None)
File "C:\Users\Emile\Desktop\Projects\emile-test-2\.venv\lib\site-packages\amqp\connection.py", line 537, in on_inbound_method
return self.channels[channel_id].dispatch_method(
File "C:\Users\Emile\Desktop\Projects\emile-test-2\.venv\lib\site-packages\amqp\abstract_channel.py", line 156, in dispatch_method
listener(*args)
File "C:\Users\Emile\Desktop\Projects\emile-test-2\.venv\lib\site-packages\amqp\channel.py", line 293, in _on_close
raise error_for_code(
amqp.exceptions.PreconditionFailed: (0, 0): (406) PRECONDITION_FAILED - delivery acknowledgement on channel 1 timed out. Timeout value used: 1800000 ms. This timeout value can be configured, see consumers doc guide to learn more
这是我在python项目中的代码:
main.py:
from fastapi import FastAPI
from tasks import sample_task, celery
app = FastAPI()
@app.get("/test")
def execute_task():
task = sample_task.apply_async(queue='tasks') # Send the task to the worker
return {'task_id': task.id}
@app.get('/check_task_status/{task_id}')
def check_task_status(task_id: str):
task_result = celery.AsyncResult(task_id)
if task_result.state == 'PENDING':
status = 'Task is pending'
elif task_result.state == 'SUCCESS':
status = 'Task completed successfully'
elif task_result.state == 'FAILURE':
status = 'Task failed'
else:
status = 'Task is in progress'
return {'task_id': task_id, 'status': status}
任务.py:
from celery import Celery
import time
celery = Celery('tasks', broker='amqp://guest:guest@localhost:5672//', backend='rpc://')
celery.config_from_object('celeryconfig')
@celery.task(queue='tasks', max_retries=5, retry_backoff=True, retry_backoff_max=1)
def sample_task():
for i in range(4):
time.sleep(5)
print("Task Completed")
cerleryconfig.py:
broker_url = 'pyamqp://'
result_backend = 'mongodb://localhost:27017/'
imports = ('tasks')
mongodb_backend_settings = {
'database': 'capetown',
'taskmeta_collection': 'tasks-info',
}
task_serializer = 'json'
result_serializer = 'json'
accept_content = ['json']
task_routes = {
'tasks.add': 'low-priority',
}
如有任何帮助,我们将不胜感激。 祝你有美好的一天。