Python - WebSocket 的异步回调/接收器

问题描述 投票:0回答:1

我正在尝试实现与服务器的 WebSocket 连接(Python 应用程序 <=> Django 应用程序)

整个系统在包含许多任务的大 Asyncio 循环中运行。代码片段只是非常小的测试部分。

我可以随时向服务器发送任何数据,其中许多数据将键入请求内容并等待响应。但我希望有一些“始终运行”的处理程序来处理所有传入的消息。 (当 Django 数据库中的某些内容发生更改时,我想将更改发送到 python 应用程序)。

如何包含始终运行的接收器/或向 websocket 添加回调?我无法找到任何解决方案。

我的代码片段:

import asyncio, json, websockets, logging

class UpdateConnection:

    async def connect(self,botName):
        self.sock = await websockets.connect('ws://localhost:8000/updates/bot/'+botName)
        
    async def send(self,data):
        try:
            await self.sock.send(json.dumps(data))
        except:
            logging.info("Websocket connection lost!")
            # Find a way how to reconenct... or make socket reconnect automatically

            
if __name__ == '__main__':
    async def DebugLoop(socketCon):
        await socketCon.connect("dev")
        print("Running..")
        while True:
            data = {"type": "debug"}
            await socketCon.send(data)
            await asyncio.sleep(1)

    uSocket = UpdateConnection()
    loop = asyncio.get_event_loop()
    loop.create_task(DebugLoop(uSocket))
    loop.run_forever()

连接后我的调试服务器将开始以随机间隔向客户端发送随机消息,我想以异步方式处理它们。

感谢您的帮助:)

python websocket async-await receiver
1个回答
4
投票

没必要这么复杂。首先,我建议您使用

websockets
模块提供的上下文模式。

来自文档:

connect() 可以用作无限异步迭代器,以在出现错误时自动重新连接:

async for websocket in websockets.connect(...):
    try:
        ...
    except websockets.ConnectionClosed:
        continue

此外,您只需通过等待传入消息来保持连接处于活动状态即可:

my_websocket = None

async for websocket in websockets.connect('ws://localhost:8000/updates/bot/' + botName):
    try:
        my_websocket = websocket
        async for message in websocket:
            pass # here you could also process incoming messages
    except websockets.ConnectionClosed:
        my_websocket = None
        continue

如您所见,我们这里有一个嵌套循环:

  1. 外循环不断重连服务器
  2. 内循环一次处理一条传入消息

如果您已连接,并且没有消息从服务器传入,则它只会休眠。

这里发生的另一件事是

my_websocket
设置为活动连接,并在连接丢失时再次取消设置。 在脚本的其他部分,您可以使用
my_websocket
发送数据。请注意,无论您在何处使用它,您都需要检查当前是否已设置:

async def send(data):
    if my_websocket:
        await my_websocket.send(json.dumps(data))

这只是一个示例,你也可以将

websocket
对象保留为对象成员,或者通过 setter 函数将其传递给另一个组件等

© www.soinside.com 2019 - 2024. All rights reserved.