FastAPI Websocket 端点内异步任务未正确退出

问题描述 投票:0回答:1

我有一个 fastapi 应用程序,它创建由反应前端发起的 websocket 连接。在其中,我监听某个 redis pubsub 通道,每当发布消息时,我都会使用相同的 websocket 对象将该消息发送到前端。这按预期工作。

async def reader(channel: aioredis.client.PubSub, websocket: WebSocket):
    while True:
        try:
            async with async_timeout.timeout(1):
                message = await channel.get_message(ignore_subscribe_messages=True)
                if message is not None and message['type'] == 'message':    
                    await websocket.send_text(message['data'].decode('utf-8'))

        except asyncio.TimeoutError:
            pass
            
        await asyncio.sleep(0.01)


@router.websocket("/api/ws/{id}")
async def websocket_endpoint(websocket: WebSocket, station_id: str, redis: aioredis.Redis = Depends(get_redis)):
    await websocket.accept()
    channel_name = f'updates:{id}'

    try:
        pubsub = redis.pubsub()
        await pubsub.subscribe(channel_name)

        await reader(pubsub, websocket)
    except WebSocketDisconnect:
        print("WebSocket disconnected by the client.")
    except asyncio.CancelledError:          
        print("WebSocket connection cancelled.")
    except Exception as e:  
        print('WebSocket error:', e)    
    finally:
        await pubsub.unsubscribe(channel_name)
        await pubsub.close()
        await websocket.close()

发布到 pubsub 的消息是在另一个 api 端点启动的 celery 任务中创建的。这按预期工作。

每当我尝试退出 fastapi 应用程序(或者监视文件检测到更改并导致重新加载)时,我都会遇到问题,无法退出异步操作。

这是独角兽和错误输出:

WARNING:  WatchFiles detected changes in 'server/routes/test_routes.py'. Reloading...
INFO:     Shutting down
INFO:     connection closed
INFO:     Waiting for background tasks to complete. (CTRL+C to force quit) # <--- FREEZES HERE
^CINFO:     Finished server process [3639091]
ERROR:    Traceback (most recent call last):
  File "XXXXX/env/lib/python3.10/site-packages/starlette/routing.py", line 747, in lifespan
    await receive()
  File "XXXXX/env/lib/python3.10/site-packages/uvicorn/lifespan/on.py", line 137, in receive
    return await self.receive_queue.get()
  File "/usr/lib/python3.10/asyncio/queues.py", line 159, in get
    await getter
asyncio.exceptions.CancelledError

WebSocket connection cancelled.
INFO:     Stopping reloader process [3638971]

我尝试过不同配置的redis-py和aioredis,有的无法发送到前端但能够正常关闭,有的可以通过socket发送但无法正常关闭。

我研究了 FastAPI 的生命周期来分配和取消分配 Redis 资源,但遇到了问题,因为它永远不会达到关闭点。

目前还没有什么问题,但任何有关此问题的帮助将不胜感激!

python asynchronous websocket redis fastapi
1个回答
0
投票

在您的代码中,reader 函数是一个无限循环,不断侦听来自 Redis 的消息。当服务器关闭时,它会等待后台任务完成,但由于读取器任务仍在运行,因此它永远不会完成,导致服务器关闭过程冻结。

要解决这个问题,需要在服务器关闭时正确处理WebSocket阅读器任务的关闭。一种方法是在服务器关闭时使用标志来指示任务停止。

© www.soinside.com 2019 - 2024. All rights reserved.