如何运行多个websocket客户端连接?

问题描述 投票:0回答:1

我有一个 Django 服务(我们称之为“主”服务),它运行一个 HTTP 服务器和一个使用 django-channels 的 WebSocket 服务器。我还有一堆其他 django 服务(称为目标服务)想要连接到主服务公开的 websocket 通道。每个服务都需要与主服务建立多个websocket连接,只要连接不被主服务关闭就可以发送/接收消息。

以下是一些示例连接 URL:

URL1 = "wss://main-service.com/ws-connection/1"
URL2 = "wss://main-service.com/ws-connection/2"
URL3 = "wss://main-service.com/ws-connection/3"

此 URL 列表事先并不知晓;主服务向我们发送一个迷你负载,用于在运行时生成 URL,并建立连接。

目标服务需要能够维护数千个 Websocket 连接(充当有状态连接),并且还需要公开 HTTP API 来服务直接请求。

到目前为止,我已经得到了作为 Django 项目运行的目标服务 API,以及一个基本的 websocket 客户端,两者如下所示:

Websocket 客户端:

from abc import abstractmethod
from websocket import WebSocketApp as _WebSocketApp


class WebSocketApp(_WebSocketApp):
    def __init__(self, url=None):
            super(WebSocketApp, self).__init__(url=url, on_open=self._on_open on_message=self._on_message, on_error=self._on_error, on_close=self._on_close)
        self.logger = get_logger()  # TODO: set level
        self.logger.debug(f"Websocket adapter is ready")

    @abstractmethod
    def _on_open(self, ws):
        pass

    @abstractmethod
    def _on_message(self, ws, message):
        pass

    @abstractmethod
    def _on_error(self, ws, error):
        pass

    @abstractmethod
    def _on_close(self, ws, code, message):
        pass

Django 视图:

from adapters.web_socket import WebSocketApp
from rest_framework.decorators import api_view
from server.celery import app


@app.task()
def ws_handler(conversation_id):
    # TODO: standardise
    url = f"wss://main-service.com/connections/{id}"
    socket = WebSocketApp(url=url)
    socket.run_forever()


@api_view(["POST"])
def index(request):
    _id = request.data["_id"]
    ws_handler.delay(id=_id)
        # NOTE: verify connection via caching  

这很有效,但是一旦建立新的 websocket 连接,Django 服务器就会被阻止。目前,我通过向 Celery 工作人员发送 websocket 连接请求来解决这个问题。我知道这不会有好结果,因为 Celery 会吃掉我的服务器,而且我很快就会失去工作线程,但它允许我保持我的 Django 服务器空闲。

相反,我真的很想把它变成一个优雅的异步 websocket 客户端,这样我就可以每个线程运行多个 websocket 连接,最好是数千个。我仍然想让 celery 工作人员为我完成数据处理工作,但我不想给它增加 websocket 管理的负担,所以我希望 websocket 的事情发生在 Celery 和 Django 服务器进程之外

这是我到目前为止整理的内容:

import pdb
import asyncio
import websockets
import json


class WebSocketAdapter:
    def __init__(self, uri):
        self.uri = uri
        self.websocket = None

    async def connect(self):
        try:
            self.websocket = await websockets.connect(self.uri)
            await self.on_connect()
            await self.receive_messages()
            await self.receive_messages()
        except Exception as e:
            await self.on_error(e)

    async def disconnect(self):
        await self.websocket.close()

    async def send_message(self, message):
        if self.websocket is not None:
            await self.websocket.send(json.dumps(message))
            print("Message Sent")
        else:
            print("WebSocket connecion is not established")

    async def receive_messages(self):
        while True:
            try:
                message = await self.websocket.recv()
                await self.on_message((message))
                # asyncio.create_task(self.on_message(await self.websocket.recv()))
            except websockets.exceptions.ConnectionClosed:
                await self.on_close()
                break
            except Exception as e:
                await self.on_error(e)

    async def on_connect(self):
        print("Connected to WebSocket server.")

    async def on_message(self, message):
        print(f"Received message from server: {message}")

    async def on_error(self, error):
        print("WebSocket error:", error)

    async def on_close(self):
        print("WebSocket connection closed.")


async def main(url):
    websocket_adapter = WebSocketAdapter(url)
    await websocket_adapter.connect()
    # NOTE: following line of code is never executed :(
    await websocket_adapter.send_message("Message from client.")



async def test():

    urls = [
        "ws://localhost:8765/conversations/1",
        "ws://localhost:8765/conversations/2",
        "ws://localhost:8765/conversations/3",
    ]
    tasks = [main(url) for url in urls]
    await asyncio.gather(*tasks)


# Run the event loop
asyncio.run(test())
# asyncio.get_event_loop().run_until_complete(test())
# pdb.set_trace()
        

它似乎允许我运行多个连接,但在初始握手后我无法发送/接收消息,代码在适配器内的

receive_message
方法处被阻止。另外,建立连接后我无法在适配器外部发送消息,这也意味着控制权永远不会返回给调用者,即 Django 服务器

我有一个简单的 websocket 适配器,它是同步的,并且与回调配合得非常好,但是使用 asyncio 我必须构建协程。我如何发回控制权,如何处理新的连接请求,以及如何在本地测试这个东西?

我基本上希望能够通过 HTTP 端点接收连接请求,在后台建立 websocket 连接,并继续接收新请求,即我应该能够使用可配置的回调初始化许多 websocket 客户端对象,并将它们用作我请。

python django websocket python-asyncio
1个回答
0
投票

为什么要使用websockets在应用程序之间传输消息,您可以使用像RabbitMQ这样的AMQP消息代理来传输这些消息。

© www.soinside.com 2019 - 2024. All rights reserved.