我有一个 Django 服务(我们称之为“主”服务),它运行一个 HTTP 服务器和一个使用 django-channels 的 WebSocket 服务器。我还有一堆其他 django 服务(称为目标服务)想要连接到主服务公开的 websocket 通道。每个服务都需要与主服务建立多个websocket连接,只要连接不被主服务关闭就可以发送/接收消息。
以下是一些示例连接 URL:
URL1 = "wss://main-service.com/ws-connection/1"
URL2 = "wss://main-service.com/ws-connection/2"
URL3 = "wss://main-service.com/ws-connection/3"
此 URL 列表事先并不知晓;主服务向我们发送一个迷你负载,用于在运行时生成 URL,并建立连接。
目标服务需要能够维护数千个 Websocket 连接(充当有状态连接),并且还需要公开 HTTP API 来服务直接请求。
到目前为止,我已经得到了作为 Django 项目运行的目标服务 API,以及一个基本的 websocket 客户端,两者如下所示:
Websocket 客户端:
from abc import abstractmethod
from websocket import WebSocketApp as _WebSocketApp
class WebSocketApp(_WebSocketApp):
def __init__(self, url=None):
super(WebSocketApp, self).__init__(url=url, on_open=self._on_open on_message=self._on_message, on_error=self._on_error, on_close=self._on_close)
self.logger = get_logger() # TODO: set level
self.logger.debug(f"Websocket adapter is ready")
@abstractmethod
def _on_open(self, ws):
pass
@abstractmethod
def _on_message(self, ws, message):
pass
@abstractmethod
def _on_error(self, ws, error):
pass
@abstractmethod
def _on_close(self, ws, code, message):
pass
Django 视图:
from adapters.web_socket import WebSocketApp
from rest_framework.decorators import api_view
from server.celery import app
@app.task()
def ws_handler(conversation_id):
# TODO: standardise
url = f"wss://main-service.com/connections/{id}"
socket = WebSocketApp(url=url)
socket.run_forever()
@api_view(["POST"])
def index(request):
_id = request.data["_id"]
ws_handler.delay(id=_id)
# NOTE: verify connection via caching
这很有效,但是一旦建立新的 websocket 连接,Django 服务器就会被阻止。目前,我通过向 Celery 工作人员发送 websocket 连接请求来解决这个问题。我知道这不会有好结果,因为 Celery 会吃掉我的服务器,而且我很快就会失去工作线程,但它允许我保持我的 Django 服务器空闲。
相反,我真的很想把它变成一个优雅的异步 websocket 客户端,这样我就可以每个线程运行多个 websocket 连接,最好是数千个。我仍然想让 celery 工作人员为我完成数据处理工作,但我不想给它增加 websocket 管理的负担,所以我希望 websocket 的事情发生在 Celery 和 Django 服务器进程之外
这是我到目前为止整理的内容:
import pdb
import asyncio
import websockets
import json
class WebSocketAdapter:
def __init__(self, uri):
self.uri = uri
self.websocket = None
async def connect(self):
try:
self.websocket = await websockets.connect(self.uri)
await self.on_connect()
await self.receive_messages()
await self.receive_messages()
except Exception as e:
await self.on_error(e)
async def disconnect(self):
await self.websocket.close()
async def send_message(self, message):
if self.websocket is not None:
await self.websocket.send(json.dumps(message))
print("Message Sent")
else:
print("WebSocket connecion is not established")
async def receive_messages(self):
while True:
try:
message = await self.websocket.recv()
await self.on_message((message))
# asyncio.create_task(self.on_message(await self.websocket.recv()))
except websockets.exceptions.ConnectionClosed:
await self.on_close()
break
except Exception as e:
await self.on_error(e)
async def on_connect(self):
print("Connected to WebSocket server.")
async def on_message(self, message):
print(f"Received message from server: {message}")
async def on_error(self, error):
print("WebSocket error:", error)
async def on_close(self):
print("WebSocket connection closed.")
async def main(url):
websocket_adapter = WebSocketAdapter(url)
await websocket_adapter.connect()
# NOTE: following line of code is never executed :(
await websocket_adapter.send_message("Message from client.")
async def test():
urls = [
"ws://localhost:8765/conversations/1",
"ws://localhost:8765/conversations/2",
"ws://localhost:8765/conversations/3",
]
tasks = [main(url) for url in urls]
await asyncio.gather(*tasks)
# Run the event loop
asyncio.run(test())
# asyncio.get_event_loop().run_until_complete(test())
# pdb.set_trace()
它似乎允许我运行多个连接,但在初始握手后我无法发送/接收消息,代码在适配器内的
receive_message
方法处被阻止。另外,建立连接后我无法在适配器外部发送消息,这也意味着控制权永远不会返回给调用者,即 Django 服务器
我有一个简单的 websocket 适配器,它是同步的,并且与回调配合得非常好,但是使用 asyncio 我必须构建协程。我如何发回控制权,如何处理新的连接请求,以及如何在本地测试这个东西?
我基本上希望能够通过 HTTP 端点接收连接请求,在后台建立 websocket 连接,并继续接收新请求,即我应该能够使用可配置的回调初始化许多 websocket 客户端对象,并将它们用作我请。
为什么要使用websockets在应用程序之间传输消息,您可以使用像RabbitMQ这样的AMQP消息代理来传输这些消息。