有没有办法让mqtt客户端向tornado websocket发送消息? 使用这段代码,龙卷风就可以工作了。而 MQTT 则没有消息传递。
import os.path
import tornado.httpserver
import tornado.web
import tornado.ioloop
import tornado.options
import tornado.httpclient
import tornado.websocket
import json
import random
from paho.mqtt import client as mqtt_client
broker = 'xxxxxxx'
port = 1883
topic = "xxx/xxxx"
client_id = f'xxxx-{random.randint(0, 100)}'
clients = []
def connect_mqtt() -> mqtt_client:
def on_connect(clientMQTT, userdata, flags, rc):
if rc == 0:
print("Connected to MQTT Broker!")
else:
print("Failed to connect, return code %d\n", rc)
clientMQTT = mqtt_client.Client(client_id)
# clientMQTT.username_pw_set(username, password)
clientMQTT.on_connect = on_connect
clientMQTT.connect(broker, port)
return clientMQTT
def subscribe(clientMQTT: mqtt_client):
def on_message(clientMQTT, userdata, msg):
print(f"Received `{msg.payload.decode()}` from `{msg.topic}` topic")
send_to_all_clients(msg.payload.decode())
clientMQTT.subscribe(topic)
clientMQTT.on_message = on_message
def runMQTT():
clientMQTT = connect_mqtt()
subscribe(clientMQTT)
clientMQTT.loop_start()
# clientMQTT.loop_forever()
def send_to_all_clients(message):
for clientws in clients:
clientws.write_message(message)
class SocketHandler(tornado.websocket.WebSocketHandler):
def open(self):
print("new client")
clients.append(self)
def on_close(self):
clients.remove(self)
print("removing client")
def on_message(self, message):
pass
# for client in utility.clients:
# if client != self:
# client.write_message(msg)
if __name__ == '__main__':
app = tornado.web.Application(
handlers = [
(r"/monitor", SocketHandler)
],
debug = True,
template_path = os.path.join(os.path.dirname(__file__), "templates"),
static_path = os.path.join(os.path.dirname(__file__), "static")
)
runMQTT()
app.listen(8200)
tornado.ioloop.IOLoop.instance().start()
我希望有人能给我一些帮助。 谢谢大家
如果我尝试在没有龙卷风的情况下收听 mqtt,它会起作用。 我希望能够通过tornado websocket将消息从mqtt发送到浏览器。
您必须记住,MQTT 客户端使用线程工作,而 Tornado 的
WebSocketHandler.send_message()
不是线程安全的。我认为 asyncio-mqtt
不会有帮助,因为从我看来它也使用线程。
因此,最简单的可能是使用
queue.Queue
在 MQTT 客户端和 Tornado Worker 之间交换消息 - Worker 会将其发送到您所有的 WebSocket 连接。
import asyncio
import logging
import threading
from functools import partial
from queue import Queue, Empty
from tornado.ioloop import IOLoop
from tornado.web import RequestHandler, Application
from tornado.websocket import WebSocketHandler, WebSocketClosedError
from tornado.log import enable_pretty_logging
from tornado.options import define, options, parse_command_line
import paho.mqtt.client as mqtt
import paho.mqtt.publish as publish
from paho.mqtt.client import MQTTMessage
define("port", default=3333, help="run on the given port", type=int)
define("debug", default=False, help="run in debug mode")
define("autoreload", default=False, help="autoreload")
define("mqtt_host", default="localhost", help="MQTT host")
define("mqtt_topic", default="test", help="MQTT topic")
MAINPAGE = """
<html>
<script>
let ws = new WebSocket("ws://localhost:3333/ws");
ws.onmessage = function(event) {
console.log(`[message] Data received from tornado: ${event.data}`);
document.getElementById("message").innerHTML = event.data;
};
ws.onerror = function(error) {
console.log(error);
alert(error);
};
</script>
<body>
<h3>message from MQTT:</h3>
<p id="message"></p>
</body>
</html>
"""
class MainHandler(RequestHandler):
async def get(self):
self.finish(MAINPAGE)
Q = Queue(maxsize=100)
def mqtt_on_message(client, userdata, msg: MQTTMessage):
logging.info(
f"thread_id={threading.get_ident()} MQTT client received message: {msg.topic} {msg.payload}"
)
message = msg.payload.decode()
data = dict(message=message, topic=msg.topic)
Q.put_nowait(data)
def mqtt_on_connect(client, userdata, flags, rc):
logging.info(f"{threading.get_ident()} MQTT connected to topic")
client.subscribe(options.mqtt_topic)
def mqtt_start_client():
client = mqtt.Client()
client.on_connect = mqtt_on_connect
client.on_message = mqtt_on_message
client.connect_async(options.mqtt_host)
client.loop_start()
return client
def mqtt_stop_client(client):
client.unsubscribe(options.mqtt_topic)
client.loop_stop(force=True)
WEB_CLIENTS = set()
class Worker:
async def work(self):
while self.should_work:
msg = None
try:
msg = await IOLoop.current().run_in_executor(
None, partial(Q.get, block=True, timeout=1)
)
except Empty:
await asyncio.sleep(0.3)
if msg:
logging.info(f"thread_id={threading.get_ident()} tornado got message from MQTT: {msg}")
background_tasks = set()
for ws in WEB_CLIENTS:
task = ws.write_message(msg)
background_tasks.add(task)
task.add_done_callback(background_tasks.discard)
def start(self):
self.should_work = True
IOLoop.current().add_callback(self.work)
def stop(self):
self.should_work = False
class SimpleWebSocket(WebSocketHandler):
async def open(self):
WEB_CLIENTS.add(self)
async def on_message(self, message):
logging.info(f"tornado got message from websocket: {message}")
#def send_to_mqtt(message):
# publish.single(options.mqtt_topic, payload=message, hostname=options.mqtt_host)
#
#try:
# await IOLoop.current().run_in_executor(None, send_to_mqtt, message)
#except asyncio.TimeoutError:
# logging.info("sending to topic timeout")
def on_close(self):
WEB_CLIENTS.remove(self)
def check_origin(self, origin):
return True
def make_app():
app = Application(
[
(r"/", MainHandler),
(r"/ws", SimpleWebSocket),
],
**dict(debug=options.debug, websocket_ping_interval=10),
)
return app
def main():
app = make_app()
app.listen(options.port)
client = None
worker = None
try:
client = mqtt_start_client()
worker = Worker()
worker.start()
IOLoop.instance().start()
except KeyboardInterrupt:
print()
if worker:
worker.stop()
if client:
mqtt_stop_client(client)
if __name__ == "__main__":
parse_command_line()
if options.debug:
enable_pretty_logging(options)
main()