我正在开发一个项目,我想实现一个系统,该系统可以从使用 Mosquitto 实现的 MQTT 代理接收消息,并将它们反应性地转发到 Web 浏览器,而无需诉诸轮询。具体来说,MQTT 代理发送的消息是 base64 编码的图像,并遵循以下结构:
{'source': 'source_name', 'measure':'image(base64 encoded)', 'timestamp':timestamp}
我们的目标是让 Python 服务器能够反应性地接收这些消息,并通过 WebSocket 连接将它们转发到 Web 客户端。此外,我希望 Web 客户端能够向 Python 服务器发送消息。
目前,我正在尝试使用用于 Python 服务器的 FastAPI、用于管理 MQTT 代理的 FastMQTT 以及用于与 Web 客户端通信的 WebSocket 来实现此解决方案。这是我到目前为止编写的代码:
from fastapi import FastAPI, WebSocket
from fastapi.responses import HTMLResponse
from fastapi_mqtt.fastmqtt import FastMQTT
from fastapi_mqtt.config import MQTTConfig
import uvicorn
app = FastAPI()
mqtt_config = MQTTConfig(host="localhost", port=1883)
fast_mqtt = FastMQTT(config=mqtt_config)
fast_mqtt.init_app(app)
messages = []
@fast_mqtt.on_connect()
def connect(client, flags, rc, properties):
print("Connected: ", client, flags, rc, properties)
@fast_mqtt.on_disconnect()
def disconnect(client, packet, exc=None):
print("Disconnected")
@fast_mqtt.subscribe("room1/cam")
async def cam_handler(client, topic, payload, qos, properties):
print("Received message on topic: ", topic)
messages.append(payload.decode())
html = """
<!DOCTYPE html>
<html>
<head>
<title>WebSocket Test</title>
</head>
<body>
<h1>WebSocket Test</h1>
<script>
document.addEventListener("DOMContentLoaded", function(event) {
var ws = new WebSocket("ws://localhost:8080/ws");
ws.onopen = function(event) {
console.log("WebSocket connection established.");
};
ws.onmessage = function(event) {
console.log("Message received:", event.data);
// Handle incoming messages as needed
};
ws.onerror = function(event) {
console.error("WebSocket error:", event);
};
ws.onclose = function(event) {
console.log("WebSocket connection closed.");
};
});
</script>
</body>
</html>
"""
@app.get("/")
async def root():
return HTMLResponse(html)
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
await websocket.accept()
while messages:
message = messages.pop(0)
await websocket.send_text(message)
if __name__ == "__main__":
uvicorn.run(app, host="localhost", port=8080)
但是,我遇到了一个问题:一旦 Web 客户端连接到 Python 服务器,WebSocket 处理程序中的
while True
循环似乎会阻塞,从而阻止其他代码块的执行。这会导致 MQTT 消息接收受阻,无法处理并将其发送到 Web 客户端。
我正在寻找一种解决方案,允许 Python 服务器响应式处理 MQTT 消息接收和与 Web 客户端的通信,避免轮询并确保组件之间的数据连续流动。
如果您能提供有关如何解决此问题的建议,或者提供更有效的替代方案来实现 MQTT 代理、Python 服务器和 Web 客户端之间的通信,我将不胜感激。
预先感谢您的帮助!
尝试以下代码:
messages = []
while messages:
message = messages.pop(0)
print(message)
print("done")
输出将“完成”,因为
messages
为空。我希望这就是您的代码中发生的情况(websocket_endpoint
接受连接然后退出;如果您随后向messages
添加一些内容,则不会对此进行任何检查)。
您的方法还有另一个问题:当第二个 Web 客户端连接时会发生什么?
您可能需要某种形式的连接管理器,如此示例所示。以下是采用示例 (license) 的粗略概述,并展示了如何使用它:
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
await manager.connect(websocket)
try:
while True:
await websocket.receive_text()
# not sure what, if anything you want to do with received data
except WebSocketDisconnect:
manager.disconnect(websocket)
然后是经理
class ConnectionManager:
def __init__(self):
self.active_connections: list[WebSocket] = []
async def connect(self, websocket: WebSocket):
await websocket.accept()
self.active_connections.append(websocket)
def disconnect(self, websocket: WebSocket):
self.active_connections.remove(websocket)
# Call broadcast when a message is received via MQTT
async def broadcast(self, message: str):
for connection in self.active_connections:
await connection.send_text(message)