我有一个 fastapi 应用程序,它创建由反应前端发起的 websocket 连接。在其中,我监听某个 redis pubsub 通道,每当发布消息时,我都会使用相同的 websocket 对象将该消息发送到前端。这按预期工作。
async def reader(channel: aioredis.client.PubSub, websocket: WebSocket):
while True:
try:
async with async_timeout.timeout(1):
message = await channel.get_message(ignore_subscribe_messages=True)
if message is not None and message['type'] == 'message':
await websocket.send_text(message['data'].decode('utf-8'))
except asyncio.TimeoutError:
pass
await asyncio.sleep(0.01)
@router.websocket("/api/ws/{id}")
async def websocket_endpoint(websocket: WebSocket, station_id: str, redis: aioredis.Redis = Depends(get_redis)):
await websocket.accept()
channel_name = f'updates:{id}'
try:
pubsub = redis.pubsub()
await pubsub.subscribe(channel_name)
await reader(pubsub, websocket)
except WebSocketDisconnect:
print("WebSocket disconnected by the client.")
except asyncio.CancelledError:
print("WebSocket connection cancelled.")
except Exception as e:
print('WebSocket error:', e)
finally:
await pubsub.unsubscribe(channel_name)
await pubsub.close()
await websocket.close()
发布到 pubsub 的消息是在另一个 api 端点启动的 celery 任务中创建的。这按预期工作。
每当我尝试退出 fastapi 应用程序(或者监视文件检测到更改并导致重新加载)时,我都会遇到问题,无法退出异步操作。
这是独角兽和错误输出:
WARNING: WatchFiles detected changes in 'server/routes/test_routes.py'. Reloading...
INFO: Shutting down
INFO: connection closed
INFO: Waiting for background tasks to complete. (CTRL+C to force quit) # <--- FREEZES HERE
^CINFO: Finished server process [3639091]
ERROR: Traceback (most recent call last):
File "XXXXX/env/lib/python3.10/site-packages/starlette/routing.py", line 747, in lifespan
await receive()
File "XXXXX/env/lib/python3.10/site-packages/uvicorn/lifespan/on.py", line 137, in receive
return await self.receive_queue.get()
File "/usr/lib/python3.10/asyncio/queues.py", line 159, in get
await getter
asyncio.exceptions.CancelledError
WebSocket connection cancelled.
INFO: Stopping reloader process [3638971]
我尝试过不同配置的redis-py和aioredis,有的无法发送到前端但能够正常关闭,有的可以通过socket发送但无法正常关闭。
我研究了 FastAPI 的生命周期来分配和取消分配 Redis 资源,但遇到了问题,因为它永远不会达到关闭点。
目前还没有什么问题,但任何有关此问题的帮助将不胜感激!
在您的代码中,reader 函数是一个无限循环,不断侦听来自 Redis 的消息。当服务器关闭时,它会等待后台任务完成,但由于读取器任务仍在运行,因此它永远不会完成,导致服务器关闭过程冻结。
要解决这个问题,需要在服务器关闭时正确处理WebSocket阅读器任务的关闭。一种方法是在服务器关闭时使用标志来指示任务停止。