在 FastAPI/Starlette 中使用 websockets 时如何运行后台任务?

问题描述 投票:0回答:1

我的最终目标是编写代码,仅在调用end_point时触发其他服务器,并等待Redis中特定通道的数据进来。

我不想知道external_server的业务逻辑是否做得好,因为延迟

call_external_server是通知external_server发送数据到redis(pub/sub)的触发函数

但是后台任务不执行!

这是我的代码

async def call_external_server(channel, text):
    print("call_external_server start")
    async with aiohttp.ClientSession() as session:
        async with session.get(f"http://localhost:9000/pub?channel={channel}&text={text}") as resp:
            print(resp)
    print("call_external_server finished")
    return {"response": "external_server is done"}


@app.websocket("/ws")
async def websocket_endpoint(channel: str, websocket: WebSocket, background_task: BackgroundTasks):
    await websocket.accept()
    client_info = dict(websocket.headers)
    text = client_info.get("text")
    redis_reader: redis.client.PubSub = await get_redis_pubsub()
    await redis_reader.subscribe(channel)
    

    # Problem is Here 
    background_task.add_task(call_external_server, channel, text)
    # Background task Doesn't work properly 

    try:
        while True:
            message = await redis_reader.get_message(ignore_subscribe_messages=True)
            if message is not None:
                decoded_msg = message["data"].decode()
                if decoded_msg == STOPWORD:
                    print("(Reader) STOP")
                    break
                await websocket.send_text(decoded_msg)
    except Exception as e:
        print(e)
        await websocket.close()
        return
    await websocket.close()
    return

python websocket fastapi background-task starlette
1个回答
0
投票

A

BackgroundTask
在向客户端的 request 返回响应
之后执行(不是在正在进行的 
websocket
 连接上)—请参阅 
这个答案,以及 这个答案 和这个相关评论 了解更多信息有关后台任务的详细信息。正如@tiangolo(FastAPI 的创建者)所解释的:

内部后台任务
依赖于

Request

,它们是
在将请求返回给客户端之后执行。如果是
WebSocket,因为 WebSocket 并未真正“完成”,但
 是一个
持续连接
,它不会是后台任务。

但是,您可以使用此答案

中描述的选项之一在后台执行函数。如果您的后台任务是
async def

函数,则应该使用选项 3(有关更多详细信息,请参阅上面的链接答案),即使用 asyncio.create_task():

async def call_external_server(channel, text):
    pass


@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
   await websocket.accept()
   # ...
   asyncio.create_task(call_external_server(channel, text))
我还建议查看

这个答案
,以及
这个答案

这个答案,了解如何在应用程序启动时生成 HTTP 客户端并在每次需要时重用它,而不是创建每次调用端点时都会创建一个新连接(如您的问题所示)。

© www.soinside.com 2019 - 2024. All rights reserved.