服务器:
@app.get("/trader/MacroSSE")
async def trader_MacroSSE(item: traderlib.data_class.GeneralItem):
accesskey_name = accesskey_chack(item)
logger.info(f"accesskey_name: {accesskey_name}, item: {item}")
return StreamingResponse(traderlib.MacroSSE().messages_generator(item), media_type="text/event-stream")
traderlib.MacroSSE().messages_generator(item):
async def messages_generator(self,request):
self.accesskey = request["accesskey"]
logging.info(f"accesskey: {self.accesskey}")
yield f"data: {self.accesskey} is connected" + "\n\n"
while True:
current_time = time.strftime("%Y-%m-%d %H:%M:%S").encode('utf-8')
message = {"event": "time_data", "data": current_time}
logging.info(f"accesskey: {self.accesskey}, message: {message}")
yield "data: "+str(message)+"\n\n"
await asyncio.sleep(1)
客户:
import aiohttp
import asyncio
start_data = {
"accesskey": "*****"
}
async def sse_client(url, retry_interval=5):
async with aiohttp.ClientSession() as session:
while True:
try:
async with session.get(url, json=start_data) as response:
response.raise_for_status()
print(response)
async for line in response.content:
print(line)
except (
aiohttp.ClientError,
aiohttp.http_exceptions.HttpProcessingError,
) as e:
print(f"Network error: {e}")
except asyncio.TimeoutError:
print("Timeout error")
except Exception as e:
print(f"Unexpected error: {e}")
print(f"Reconnecting in {retry_interval} seconds...")
await asyncio.sleep(retry_interval)
url = "*****"
asyncio.run(sse_client(url))
服务端和客户端的代码编写如下。 当客户端向服务器发出请求时,返回以下信息,但连接立即终止。
<ClientResponse(*****) [200 OK]>
<CIMultiDictProxy('Server': 'openresty', 'Date': 'Wed, 24 Jan 2024 12:36:58 GMT', 'Content-Type': 'text/event-stream; charset=utf-8', ' Transfer-Encoding': 'chunked', 'Connection': 'keep-alive', 'X-Served-By': '*****')>
如何从服务器接收连续数据?
我在服务器日志中没有看到任何其他错误,并且连接立即关闭。
[2024-01-24 21:37:03,961][root][INFO|whitelist_chack():66] >> request_host: localhost_2, * * * * *, request_url: * * * * *
[2024-01-24 21:37:03,963][root][INFO|trader_MacroSSE():175] >> accesskey_name: coder, item: accesskey='* * * * *'
[2024-01-24 21:37:03,965][root][INFO|whitelist_chack():70] >> working time: 0.0042 second
客户端也没有错误。
不确定是否有人面临与我类似的问题。这是我能找到的唯一与我最接近的问题。所以我在这里写了这么多额外的信息,希望它能帮助其他寻找这个的人。
之前一直被fastAPI的sse问题困扰,终于得到了完美的解决方案。造成这种情况的原因主要有2个:
fastapi 的流响应由于某种未知的原因而表现不佳。 解决方案:使用
EventSourceResponse
代替。请参阅https://stackoverflow.com/a/75909687/21265332
aiohttp 不支持 sse 响应的解析。目前aio的sse的开源模块主要有2个问题:
\n
:实际上最好的方法是在屈服之前用 \n
来逃避 \\n
。但有时您仍然希望收到单行换行符,以使前端更易于处理。为了解决客户端问题,我做了一个pip模块
aiossechat
,它是从另一个包修改的,以便正确使用aiohttp prase sse。发现这个的人,请直接拨打pip install aiossechat
。