我的最终目标是编写代码,仅在调用end_point时触发其他服务器,并等待Redis中特定通道的数据进来。
我不想知道external_server的业务逻辑是否做得好,因为延迟
call_external_server是通知external_server发送数据到redis(pub/sub)的触发函数
但是后台任务不执行!
这是我的代码
async def call_external_server(channel, text):
print("call_external_server start")
async with aiohttp.ClientSession() as session:
async with session.get(f"http://localhost:9000/pub?channel={channel}&text={text}") as resp:
print(resp)
print("call_external_server finished")
return {"response": "external_server is done"}
@app.websocket("/ws")
async def websocket_endpoint(channel: str, websocket: WebSocket, background_task: BackgroundTasks):
await websocket.accept()
client_info = dict(websocket.headers)
text = client_info.get("text")
redis_reader: redis.client.PubSub = await get_redis_pubsub()
await redis_reader.subscribe(channel)
# Problem is Here
background_task.add_task(call_external_server, channel, text)
# Background task Doesn't work properly
try:
while True:
message = await redis_reader.get_message(ignore_subscribe_messages=True)
if message is not None:
decoded_msg = message["data"].decode()
if decoded_msg == STOPWORD:
print("(Reader) STOP")
break
await websocket.send_text(decoded_msg)
except Exception as e:
print(e)
await websocket.close()
return
await websocket.close()
return
A
BackgroundTask
在向客户端的 request
返回响应之后执行(不是在正在进行的
websocket
连接上)—请参阅这个答案,以及 这个答案 和这个相关评论 了解更多信息有关后台任务的详细信息。正如@tiangolo(FastAPI 的创建者)所解释的: 内部后台任务
依赖于
Request
,它们是 在将请求返回给客户端之后执行。如果是WebSocket
,因为WebSocket
并未真正“完成”,但是一个 持续连接,它不会是后台任务。 但是,您可以使用此答案中描述的选项之一在后台执行函数。如果您的后台任务是
async def
函数,则应该使用选项 3(有关更多详细信息,请参阅上面的链接答案),即使用 asyncio.create_task()
:
async def call_external_server(channel, text):
pass
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
await websocket.accept()
# ...
asyncio.create_task(call_external_server(channel, text))
我还建议查看这个答案,以及这个答案
和这个答案,了解如何在应用程序启动时生成 HTTP 客户端并在每次需要时重用它,而不是创建每次调用端点时都会创建一个新连接(如您的问题所示)。