如何使用 WebSocket 在 Python 服务器、MQTT 代理和 Web 客户端之间实现反应式通信?

问题描述 投票:0回答:1

我正在开发一个项目,我想实现一个系统,该系统可以从使用 Mosquitto 实现的 MQTT 代理接收消息,并将它们反应性地转发到 Web 浏览器,而无需诉诸轮询。具体来说,MQTT 代理发送的消息是 base64 编码的图像,并遵循以下结构:

{'source': 'source_name', 'measure':'image(base64 encoded)', 'timestamp':timestamp}

我们的目标是让 Python 服务器能够反应性地接收这些消息,并通过 WebSocket 连接将它们转发到 Web 客户端。此外,我希望 Web 客户端能够向 Python 服务器发送消息。

目前,我正在尝试使用用于 Python 服务器的 FastAPI、用于管理 MQTT 代理的 FastMQTT 以及用于与 Web 客户端通信的 WebSocket 来实现此解决方案。这是我到目前为止编写的代码:

from fastapi import FastAPI, WebSocket
from fastapi.responses import HTMLResponse
from fastapi_mqtt.fastmqtt import FastMQTT
from fastapi_mqtt.config import MQTTConfig
import uvicorn

app = FastAPI()

mqtt_config = MQTTConfig(host="localhost", port=1883)

fast_mqtt = FastMQTT(config=mqtt_config)

fast_mqtt.init_app(app)

messages = []

@fast_mqtt.on_connect()
def connect(client, flags, rc, properties):
    print("Connected: ", client, flags, rc, properties)

@fast_mqtt.on_disconnect()
def disconnect(client, packet, exc=None):
    print("Disconnected")

@fast_mqtt.subscribe("room1/cam")
async def cam_handler(client, topic, payload, qos, properties):
    print("Received message on topic: ", topic)
    messages.append(payload.decode())

html = """
<!DOCTYPE html>
<html>
<head> 
    <title>WebSocket Test</title>
</head>
<body>
    <h1>WebSocket Test</h1>

    <script>
        document.addEventListener("DOMContentLoaded", function(event) {
            var ws = new WebSocket("ws://localhost:8080/ws");
            ws.onopen = function(event) {
                console.log("WebSocket connection established.");
            };
            ws.onmessage = function(event) {
                console.log("Message received:", event.data);
                // Handle incoming messages as needed
            };
            ws.onerror = function(event) {
                console.error("WebSocket error:", event);
            };
            ws.onclose = function(event) {
                console.log("WebSocket connection closed.");
            };
        });
    </script>
</body>
</html>
"""

@app.get("/")
async def root():
    return HTMLResponse(html)

@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
    await websocket.accept()
    while messages:
        message = messages.pop(0)
        await websocket.send_text(message)
            

if __name__ == "__main__":
    uvicorn.run(app, host="localhost", port=8080)

但是,我遇到了一个问题:一旦 Web 客户端连接到 Python 服务器,WebSocket 处理程序中的

while True
循环似乎会阻塞,从而阻止其他代码块的执行。这会导致 MQTT 消息接收受阻,无法处理并将其发送到 Web 客户端。

我正在寻找一种解决方案,允许 Python 服务器响应式处理 MQTT 消息接收和与 Web 客户端的通信,避免轮询并确保组件之间的数据连续流动。

如果您能提供有关如何解决此问题的建议,或者提供更有效的替代方案来实现 MQTT 代理、Python 服务器和 Web 客户端之间的通信,我将不胜感激。

预先感谢您的帮助!

python websocket fastapi mqtt mosquitto
1个回答
0
投票

尝试以下代码:

messages = []
while messages:
   message = messages.pop(0)
   print(message)

print("done")   

输出将“完成”,因为

messages
为空。我希望这就是您的代码中发生的情况(
websocket_endpoint
接受连接然后退出;如果您随后向
messages
添加一些内容,则不会对此进行任何检查)。

您的方法还有另一个问题:当第二个 Web 客户端连接时会发生什么?

您可能需要某种形式的连接管理器,如此示例所示。以下是采用示例 (license) 的粗略概述,并展示了如何使用它:

@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
    await manager.connect(websocket)
    try:
        while True:
            await websocket.receive_text()
            # not sure what, if anything you want to do with received data
    except WebSocketDisconnect:
        manager.disconnect(websocket)

然后是经理

class ConnectionManager:
    def __init__(self):
        self.active_connections: list[WebSocket] = []

    async def connect(self, websocket: WebSocket):
        await websocket.accept()
        self.active_connections.append(websocket)

    def disconnect(self, websocket: WebSocket):
        self.active_connections.remove(websocket)

    # Call broadcast when a message is received via MQTT
    async def broadcast(self, message: str):
        for connection in self.active_connections:
            await connection.send_text(message)
© www.soinside.com 2019 - 2024. All rights reserved.