常规MQTT和tornado websocket如何通信?

问题描述 投票:0回答:1

有没有办法让mqtt客户端向tornado websocket发送消息? 使用这段代码,龙卷风就可以工作了。而 MQTT 则没有消息传递。

import os.path
import tornado.httpserver
import tornado.web
import tornado.ioloop
import tornado.options
import tornado.httpclient
import tornado.websocket
import json
import random
from paho.mqtt import client as mqtt_client


broker = 'xxxxxxx'
port = 1883
topic = "xxx/xxxx"

client_id = f'xxxx-{random.randint(0, 100)}'

clients = []

def connect_mqtt() -> mqtt_client:
    def on_connect(clientMQTT, userdata, flags, rc):
        if rc == 0:
            print("Connected to MQTT Broker!")
        else:
            print("Failed to connect, return code %d\n", rc)

    clientMQTT = mqtt_client.Client(client_id)
    # clientMQTT.username_pw_set(username, password)
    clientMQTT.on_connect = on_connect
    clientMQTT.connect(broker, port)
    return clientMQTT


def subscribe(clientMQTT: mqtt_client):
    def on_message(clientMQTT, userdata, msg):
        print(f"Received `{msg.payload.decode()}` from `{msg.topic}` topic")
        send_to_all_clients(msg.payload.decode())

    clientMQTT.subscribe(topic)
    clientMQTT.on_message = on_message


def runMQTT():
    clientMQTT = connect_mqtt()
    subscribe(clientMQTT)
    clientMQTT.loop_start()
    # clientMQTT.loop_forever()

def send_to_all_clients(message):
    for clientws in clients:
        clientws.write_message(message)


class SocketHandler(tornado.websocket.WebSocketHandler):
    def open(self):
        print("new client")
        clients.append(self)

    def on_close(self):
        clients.remove(self)
        print("removing client")

    def on_message(self, message):
        pass
        # for client in utility.clients:
        #     if client != self:
        #         client.write_message(msg)

if __name__ == '__main__':
    app = tornado.web.Application(
        handlers = [
            (r"/monitor", SocketHandler)
        ],
        debug = True,
        template_path = os.path.join(os.path.dirname(__file__), "templates"),
        static_path = os.path.join(os.path.dirname(__file__), "static")
    )
    runMQTT()
    app.listen(8200)
    tornado.ioloop.IOLoop.instance().start()

我希望有人能给我一些帮助。 谢谢大家

如果我尝试在没有龙卷风的情况下收听 mqtt,它会起作用。 我希望能够通过tornado websocket将消息从mqtt发送到浏览器。

python-3.x websocket mqtt tornado
1个回答
0
投票

您必须记住,MQTT 客户端使用线程工作,而 Tornado 的

WebSocketHandler.send_message()
不是线程安全的。我认为
asyncio-mqtt
不会有帮助,因为从我看来它也使用线程。

因此,最简单的可能是使用

queue.Queue
在 MQTT 客户端和 Tornado Worker 之间交换消息 - Worker 会将其发送到您所有的 WebSocket 连接。

import asyncio
import logging
import threading
from functools import partial
from queue import Queue, Empty

from tornado.ioloop import IOLoop
from tornado.web import RequestHandler, Application
from tornado.websocket import WebSocketHandler, WebSocketClosedError
from tornado.log import enable_pretty_logging
from tornado.options import define, options, parse_command_line

import paho.mqtt.client as mqtt
import paho.mqtt.publish as publish
from paho.mqtt.client import MQTTMessage

define("port", default=3333, help="run on the given port", type=int)
define("debug", default=False, help="run in debug mode")
define("autoreload", default=False, help="autoreload")
define("mqtt_host", default="localhost", help="MQTT host")
define("mqtt_topic", default="test", help="MQTT topic")


MAINPAGE = """
<html>
<script>
let ws = new WebSocket("ws://localhost:3333/ws");
ws.onmessage = function(event) {
  console.log(`[message] Data received from tornado: ${event.data}`);
  document.getElementById("message").innerHTML = event.data;
};
ws.onerror = function(error) {
  console.log(error);
  alert(error);
};
</script>
<body>
<h3>message from MQTT:</h3>
<p id="message"></p>
</body>
</html>
"""


class MainHandler(RequestHandler):
    async def get(self):
        self.finish(MAINPAGE)


Q = Queue(maxsize=100)


def mqtt_on_message(client, userdata, msg: MQTTMessage):
    logging.info(
        f"thread_id={threading.get_ident()} MQTT client received message: {msg.topic} {msg.payload}"
    )
    message = msg.payload.decode()
    data = dict(message=message, topic=msg.topic)
    Q.put_nowait(data)


def mqtt_on_connect(client, userdata, flags, rc):
    logging.info(f"{threading.get_ident()} MQTT connected to topic")
    client.subscribe(options.mqtt_topic)


def mqtt_start_client():
    client = mqtt.Client()
    client.on_connect = mqtt_on_connect
    client.on_message = mqtt_on_message
    client.connect_async(options.mqtt_host)
    client.loop_start()
    return client


def mqtt_stop_client(client):
    client.unsubscribe(options.mqtt_topic)
    client.loop_stop(force=True)


WEB_CLIENTS = set()


class Worker:
    async def work(self):
        while self.should_work:
            msg = None
            try:
                msg = await IOLoop.current().run_in_executor(
                    None, partial(Q.get, block=True, timeout=1)
                )
            except Empty:
                await asyncio.sleep(0.3)

            if msg:
                logging.info(f"thread_id={threading.get_ident()} tornado got message from MQTT: {msg}")
                background_tasks = set()
                for ws in WEB_CLIENTS:
                    task = ws.write_message(msg)
                    background_tasks.add(task)
                    task.add_done_callback(background_tasks.discard)

    def start(self):
        self.should_work = True
        IOLoop.current().add_callback(self.work)

    def stop(self):
        self.should_work = False


class SimpleWebSocket(WebSocketHandler):
    async def open(self):
        WEB_CLIENTS.add(self)

    async def on_message(self, message):
        logging.info(f"tornado got message from websocket: {message}")
        #def send_to_mqtt(message):
        #    publish.single(options.mqtt_topic, payload=message, hostname=options.mqtt_host)
        #
        #try:
        #    await IOLoop.current().run_in_executor(None, send_to_mqtt, message)
        #except asyncio.TimeoutError:
        #    logging.info("sending to topic timeout")

    def on_close(self):
        WEB_CLIENTS.remove(self)

    def check_origin(self, origin):
        return True


def make_app():
    app = Application(
        [
            (r"/", MainHandler),
            (r"/ws", SimpleWebSocket),
        ],
        **dict(debug=options.debug, websocket_ping_interval=10),
    )
    return app


def main():
    app = make_app()
    app.listen(options.port)
    client = None
    worker = None
    try:
        client = mqtt_start_client()
        worker = Worker()
        worker.start()
        IOLoop.instance().start()
    except KeyboardInterrupt:
        print()
    if worker:
        worker.stop()
    if client:
        mqtt_stop_client(client)


if __name__ == "__main__":
    parse_command_line()
    if options.debug:
        enable_pretty_logging(options)
    main()
© www.soinside.com 2019 - 2024. All rights reserved.