如何在异步RestAPI中等待Celery任务结果?

问题描述 投票:0回答:1

我有一个使用 FastAPI 构建的 API,该端点将任务提交给 celery 工作人员,等待工作人员完成其工作并将结果返回给用户。

问题是等待结果的正确方法是什么?

端点代码

from tasks import celery_application, some_task
from celery.result import AsyncResult

@api.post('/submit')
async def submit(data: str):
    task = some_task.apply_async(kwargs={'data': data}, queue='some_queue')
    result = AsyncResult(id=task.task_id, app=celery_application).get()
    return {'task_result': result}

AsyncResult
的问题是
get
方法会阻塞应用程序,它会同步等待结果,同时API会冻结。

我想出的解决方案之一是循环检查 n 秒的结果

from tasks import celery_application, some_task
import asyncio
import redis


r = redis.Redis.from_url(REDIS_CONN_URI)


@api.post('/submit')
async def submit(data: str):
    task = some_task.apply_async(kwargs={'data': data}, queue='some_queue')
    result = None
    for _ in range(100):
        if r.exists(task.task_id):
            result = r.get(task.task_id)
            break
        await asyncio.sleep(0.3)

    return {'task_result': result}

但它只能部分起作用。虽然端点没有被阻止并且可以访问。当端点尝试再次到达发送任务时,它会被阻止。

python redis celery python-asyncio fastapi
1个回答
0
投票

是的,那个redis接口是阻塞的。 您应该切换到使用

aioredis
进行此池化,以便它可以与异步代码一起工作,或者将 redis 任务卡在 ThreadPoolExecutor 中 - 这样主线程就不会阻塞等待
.get
的返回.

第二种方法不需要您对代码或必要条件进行任何更改 - 只需创建一个可以按流程使用的合适的工作池。这应该与您每秒收到的请求数成正比,并且不受您拥有的 CPU 核心数量的限制:工作线程中的大多数线程在等待结果时不会执行任何操作。 (不幸的是,Python 执行器模型不提供动态池,否则,根据负载调整池大小可能是一个不错的选择)。无论如何,如果 celery 工作线程是本地的,则您可以同时获得的结果数量受到限制 - 因此标准 2X CPU 核心数可能就足够了。否则,我相信,最多 100 或 200 个线程的线程池执行器可以适用于中型虚拟机,并为您提供每秒大约 1000 秒请求的良好吞吐量。


from tasks import celery_application, some_task
import asyncio
import redis

MAXWORKERS = 24 # check text. 
executor = concurrent.futures.ThreadPoolExecutor(MAXWORKERS)

r = redis.Redis.from_url(REDIS_CONN_URI)


@api.post('/submit')
async def submit(data: str):
    task = some_task.apply_async(kwargs={'data': data}, queue='some_queue')
    result_call = AsyncResult(id=task.task_id, app=celery_application).get
    result = await asyncio.run_in_executor(executor, result_call)
    return {'task_result': result}
    return {'task_result': result}

当然,这假设您的前端可以轻松等待

.get()
返回 - 我假设最多需要几秒钟。这不会阻止您的 HTTP API。
如果结果较长,那么您确实必须重新构建事物,以便立即向调用者返回响应,然后再提供一些反馈。 Asyncio 也可以用于此目的,具体取决于您打算提供的反馈,通过注册一个回调函数以在
.get
返回时运行 - 在这种情况下,您可以调用
asyncio.create_task
,将
.run_in_executor
调用传递给它,然后对返回的任务调用
.add_done_callback
。 (此外,您将需要一个数据结构(
set
)来跟踪正在进行的任务,否则它们可能会被事件循环取消引用)

© www.soinside.com 2019 - 2024. All rights reserved.