使用 Paho 客户端进行同步 MQTT 通信

问题描述 投票:0回答:2

我有一个场景,移动应用程序调用我的应用程序托管的 REST API。在此过程中,我需要通过 MQTT 向下游系统发送消息,并等待收到该消息的响应。然后我回复了移动应用程序。

这里的挑战是,通过 MQTT 的消息传递是异步的。因此,我收到的消息将在不同的线程中(某个侦听器类,侦听 messageArrived())。如何返回调用http线程?

Paho 库支持同步通信吗?比如我发送一条消息,打开某个主题并等待它直到收到消息或超时?

mqtt paho
2个回答
0
投票
# python3.6

import random

from paho.mqtt import client as mqtt_client
from queue import Queue
import datetime
import time


broker = '127.0.0.1'#'broker.emqx.io'
port = 1883
topic_1 = "Parent/Topic1"
topic_2 = "Parent/Topic2"
msg1_start = "topic1: start"
msg1_started = "topic1: started"
msg1_finished = "topic1: finished"
msg2_start = "topic2: start"
msg2_started = "topic2: started"
msg2_finished = "topic2: finished"
# Generate a Client ID with the subscribe prefix.
client_id = f'subscribe-{random.randint(0, 100)}'
# username = 'emqx'
# password = 'public'
msgQueue = Queue()


def connect_mqtt() -> mqtt_client:
    def on_connect(client, userdata, flags, rc):
        if rc == 0:
            print(f"{datetime.datetime.now()} [Info] Connected to MQTT Broker!")
        else:
            print(f"{datetime.datetime.now()} [Error] Failed to connect, return code %d\n", rc)

    client = mqtt_client.Client(client_id)
    # client.username_pw_set(username, password)
    client.on_connect = on_connect
    client.connect(broker, port)
    return client


def subscribe(client: mqtt_client):
    def on_message(client, userdata, msg):
        print(f"{datetime.datetime.now()} [Info] Received `{msg.payload.decode()}` from `{msg.topic}` topic")
        msgQueue.put(msg)

    client.subscribe(topic_1)
    client.subscribe(topic_2)
    client.on_message = on_message

def publish(client,topic,msg):

        result = client.publish(topic, msg)
        # result: [0, 1]
        status = result[0]
        if status == 0:
            print(f"{datetime.datetime.now()} [Info] Send `{msg}` to topic `{topic}`")
        else:
            print(f"{datetime.datetime.now()} [Error] Failed to send message to topic {topic}")    


def run():
    client = connect_mqtt()
    time.sleep(1)
    publish(client,topic_1,msg1_start)
    subscribe(client)
    #client.loop_forever()
    client.loop_start()
    while True:
        message = msgQueue.get()
        if message is None:
            continue
        print(f"{datetime.datetime.now()} [Info] received from queue",str(message.payload.decode("utf-8")))
        if message.topic == topic_1 and message.payload.decode() == msg1_finished:
            publish(client,topic_2,msg2_start)
        elif message.topic == topic_2 and message.payload.decode() == msg2_finished:
            break

       
    client.loop_stop() #todo client thread needs to be stopped when filling completed.

    

if __name__ == '__main__':
    run()

-2
投票

MQTT 本质上是异步的,所有 Pub/Sub 实现也是如此。在协议级别没有回复消息的概念,您无法知道是否会收到对已发布消息的回复(或者可能会收到很多回复),因为您甚至不知道是否有订阅您发布的主题。

可以构建一个以这种方式工作的系统,但您需要维护所有正在运行的请求的状态机,实施合理的超时策略,并确定如果收到多个响应该怎么办。

您没有提到您正在使用哪些不同的 Paho 库,但我从方法名称猜测是 Java,但在不知道您正在使用什么 HTTP 框架以及许多其他因素的情况下,我不会建议解决方案,特别是因为它将涉及大量轮询和同步。

是否有什么原因导致移动应用无法直接发布和订阅 MQTT 主题?这将消除对此的需要。

© www.soinside.com 2019 - 2024. All rights reserved.