我从websocket服务器(A)接收实时数据。我想使用另一个Websocket服务器(B)转发该数据,该服务器将用于与客户端(Web应用程序)共享数据。
而不是在websocket服务器(A)上进行中继。每当我从“ A”接收数据时,我都会使用“ B”将其广播到客户端。
SubscribeToServerA()
While True :
Server_A_Callback() :
BroadcastDataViaServerB()
我对Websockets和其他东西都是新手,欢迎其他方法和建议
我相信基本上是您想要的东西:
from typing import List, Tuple
import websockets
import asyncio
async def server_a_accept(input_ws: websockets.WebSocketServerProtocol):
while True:
data = await input_ws.recv()
for client_ws, client_queue in clients:
# WARNING if you call await inside this for you will need to make a copy
# of the clients list before entry because it can mutate during iteration
client_queue.put_nowait(data)
async def server_b_accept(client_ws: websockets.WebSocketServerProtocol):
send_queue = asyncio.Queue()
clients.append((client_ws, send_queue))
while True:
data = await send_queue.get()
await client_ws.send(data)
send_queue.task_done()
server_a = websockets.serve(server_a_accept, ('127.0.0.1', 5001))
server_b = websockets.serve(server_b_accept, ('127.0.0.1', 5002))
clients: List[Tuple[websockets.WebSocketServerProtocol, asyncio.Queue]] = []
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(
[server_a.ws_server.wait_closed(), server_a.ws_server.wait_closed()],
return_when=asyncio.FIRST_COMPLETED
))
警告:我尚未运行/测试此代码,但它的结构和逻辑应该与您想要的非常接近。如果我的记忆正确地为您服务,则可能需要基于Websocket编写方式的loop.run_forever()
。 (在那种情况下,我可能会使用asyncio.Event作为共享停止布尔值)
警告:客户端速度慢可能会导致其队列备份。如果这是一个问题,您可能需要引入一些逻辑来捕获和断开慢速客户端或背压。
1)可行。我想象一个websocket位于一个安全或受信任的网络上,另一个位于不那么安全的网络上。这并不少见。
2)将数据存储在数据库中实际上取决于您。如果由于崩溃而错过了条目,那有多重要?如果单个python进程的吞吐量足够,并且丢失数据也不是什么大问题,那么上面的代码可能“足够好”。如果您需要更高的吞吐量,我可能会考虑使用Redis之类的东西来代替数据库。