使用 FastAPI 和 aiohttp 的 SSE 通信效果不佳

问题描述 投票:0回答:1

服务器:

@app.get("/trader/MacroSSE")
async def trader_MacroSSE(item: traderlib.data_class.GeneralItem):
    accesskey_name = accesskey_chack(item)
    logger.info(f"accesskey_name: {accesskey_name}, item: {item}")

    return StreamingResponse(traderlib.MacroSSE().messages_generator(item), media_type="text/event-stream")

traderlib.MacroSSE().messages_generator(item):

    async def messages_generator(self,request):
        self.accesskey = request["accesskey"]
        logging.info(f"accesskey: {self.accesskey}")
        yield f"data: {self.accesskey} is connected" + "\n\n"
        while True:
            current_time = time.strftime("%Y-%m-%d %H:%M:%S").encode('utf-8')
            message = {"event": "time_data", "data": current_time}
            logging.info(f"accesskey: {self.accesskey}, message: {message}")
            yield "data: "+str(message)+"\n\n"
            await asyncio.sleep(1)

客户:

import aiohttp
import asyncio

start_data = {
    "accesskey": "*****"
}


async def sse_client(url, retry_interval=5):
    async with aiohttp.ClientSession() as session:
        while True:
            try:
                async with session.get(url, json=start_data) as response:
                    response.raise_for_status()
                    print(response)

                    async for line in response.content:
                        print(line)

            except (
                aiohttp.ClientError,
                aiohttp.http_exceptions.HttpProcessingError,
            ) as e:
                print(f"Network error: {e}")
            except asyncio.TimeoutError:
                print("Timeout error")
            except Exception as e:
                print(f"Unexpected error: {e}")

            print(f"Reconnecting in {retry_interval} seconds...")
            await asyncio.sleep(retry_interval)


url = "*****"
asyncio.run(sse_client(url))

服务端和客户端的代码编写如下。 当客户端向服务器发出请求时,返回以下信息,但连接立即终止。

<ClientResponse(*****) [200 OK]>
<CIMultiDictProxy('Server': 'openresty', 'Date': 'Wed, 24 Jan 2024 12:36:58 GMT', 'Content-Type': 'text/event-stream; charset=utf-8', ' Transfer-Encoding': 'chunked', 'Connection': 'keep-alive', 'X-Served-By': '*****')>

如何从服务器接收连续数据?

我在服务器日志中没有看到任何其他错误,并且连接立即关闭。

[2024-01-24 21:37:03,961][root][INFO|whitelist_chack():66] >> request_host: localhost_2, * * * * *, request_url: * * * * *
[2024-01-24 21:37:03,963][root][INFO|trader_MacroSSE():175] >> accesskey_name: coder, item: accesskey='* * * * *'
[2024-01-24 21:37:03,965][root][INFO|whitelist_chack():70] >> working time: 0.0042 second

客户端也没有错误。

python python-asyncio fastapi server-sent-events aiohttp
1个回答
0
投票

不确定是否有人面临与我类似的问题。这是我能找到的唯一与我最接近的问题。所以我在这里写了这么多额外的信息,希望它能帮助其他寻找这个的人。


之前一直被fastAPI的sse问题困扰,终于得到了完美的解决方案。造成这种情况的原因主要有2个:

  1. fastapi 的流响应由于某种未知的原因而表现不佳。 解决方案:使用

    EventSourceResponse
    代替。请参阅https://stackoverflow.com/a/75909687/21265332

  2. aiohttp 不支持 sse 响应的解析。目前aio的sse的开源模块主要有2个问题:

    • 在“GET”之外的方法中引发错误:传统上认为SSE仅支持GET,但事实是您可以使用任何方法类型响应SSE。这对于在某些流案例中使用“POST”而不是“GET”非常有帮助,特别是对于像我这样使用 FastAPI 构建自己的 LLM AI 服务的人。
    • Prasing 会跳过单个
      \n
      :实际上最好的方法是在屈服之前用
      \n
      来逃避
      \\n
      。但有时您仍然希望收到单行换行符,以使前端更易于处理。

为了解决客户端问题,我做了一个pip模块

aiossechat
,它是从另一个包修改的,以便正确使用aiohttp prase sse。发现这个的人,请直接拨打
pip install aiossechat

© www.soinside.com 2019 - 2024. All rights reserved.