Paho MQTT while循环被阻止发布到另一个MQTT客户端

问题描述 投票:0回答:1

我的目标是在物联网中某些模块之间的后端上正确处理MQTT消息。不幸的是隔离区,现在我无法使用模块(实际上在其他城市中)。因此,我决定实现模块仿真器类,该类将接收我的请求或发送响应。

第二个问题是,发布后我需要等待模块ACK或ERR。对于这个问题,我制作了ack_blocker列表,如下所示:

[
    {
        "module_mac": "mac",
        "blocked": False,
        "message": {}
    },
    {
        "module_mac": "mac",
        "blocked": False,
        "message": {}
    }
]

因此,当我向特定模块发送消息时,blocked属性将设置为True,并且在发布消息后我将在while循环中等待。另一方面,我发布的消息应该到达仿真器MQTT客户端,它将解析数据并响应ERR或ACK。在收到消息后,我将阻塞属性设置回False,循环将结束,并通过错误或正确的消息将消息返回到后端视图。

问题是,来自后端的已发布消息将永远不会到达仿真器MQTT客户端。 IDK为什么,但在我的循环中是超时(10s),在此时间之后,我应该抛出模块未响应的错误。我正在非常仔细地调试整个过程,当后端将要引发错误时,我的模拟器客户端最终将收到消息。我跑了更多次,每次都会完全一样。因此,我认为该循环以某种方式阻止了该消息的发送。

这是我的循环实现:

    def send_message(self, mac: str, message: str):
        self.publish(mac, message)
        end_time = time.time() + self.timeout

        while True:
            module_ack_blocker = next(filter(lambda obj: obj.get('module_mac') == mac, self.ack_blocker), None)

            if not module_ack_blocker.get('blocked'):
                response = module_ack_blocker.get('message')

                if response.get('result') == 'OK':
                    logging.getLogger('root_logger').info(f'[MQTT]: ACK Message received.')
                    return response
                elif response.get('result') == 'ERROR':
                    raise MQTTException(response.get('details'), status_code=mqtt_status.MQTT_ERR_NOT_SUPPORTED)

            if time.time() > end_time:
                raise MQTTException('Module is not responding.', status_code=mqtt_status.MQTT_ERR_UNKNOWN)

如您所见,首先我发布消息。之后,我将计算超时,然后循环将开始。在循环中,我首先查看ack阻止程序列表中的正确字典(如我之前提到的)。我会问它是否没有被阻止。之后,如果我还有时间超时。

我的mqtt仿真器客户端看起来像这样:

class MqttClientEmulator(object):
    def __init__(self):
        self.app = None
        self.broker_host = None
        self.broker_port = None
        self.keep_alive = None
        self.timeout = None

        self.client = mqtt.Client(client_id='brewmaster_client_emulator')

    def init(self, app):
        self.broker_host = os.getenv('BROKER_HOST')
        self.broker_port = int(os.getenv('BROKER_PORT'))
        self.keep_alive = int(os.getenv('MQTT_KEEPALIVE'))
        self.timeout = int(os.getenv('MQTT_TIMEOUT'))
        self.app = app

        self.client.on_message = self.on_message

    def on_message(self, client, userdata, msg):
        topic = msg.topic
        string_message = str(msg.payload.decode('utf-8'))
        dict_message = json.loads(string_message)

        # Request result
        if dict_message.get('device_uuid'):
            print(dict_message)
            response = {
                "module_mac": topic,
                "sequence_number": 123,
                "result": "OK",
                "details": ""
            }
            time.sleep(1)   # Just for time reserve (this code will be more complicated in future)
            self.publish('brewmaster-backend', json.dumps(response))

    def connect(self):
        self.client.connect(self.broker_host, self.broker_port, self.keep_alive)
        self.client.loop_start()

    def disconnect(self):
        self.client.loop_stop()
        self.client.disconnect()

    def subscribe(self, name):
        self.client.subscribe(name)

    def publish(self, topic, message):
        self.client.publish(topic, message)

谢谢大家的帮助或想法。我也尝试过线程,也没有任何效果。如果您希望查看我项目的其他部分,请在评论中告诉我。谢谢。

python loops flask mqtt publish
1个回答
0
投票

确定,我需要更深入地研究paho MQTT库。对象MQTTMessageInfo有功能wait_for_publish。而且,如果您查看_condition对象,它已经实现了带信号量的锁。因此,我需要做的就是将我的MQTT客户端的send_message方法中的While循环更改为如下所示:

    def send_message(self, mac: str, message: str):
        result = self.publish(mac, message)
        end_time = time.time() + self.timeout

        if result.rc == mqtt.MQTT_ERR_QUEUE_SIZE:
            raise ValueError('Message is not queued due to ERR_QUEUE_SIZE')
        with result._condition:
            while True:
                module_ack_blocker = next(filter(lambda obj: obj.get('module_mac') == mac, self.ack_blocker), None)

                if not module_ack_blocker.get('blocked'):
                    response = module_ack_blocker.get('message')

                    if response.get('result') == 'OK':
                        logging.getLogger('root_logger').info(f'[MQTT]: ACK Message received.')
                        return response
                    elif response.get('result') == 'ERROR':
                        raise MQTTException(response.get('details'), status_code=mqtt_status.MQTT_ERR_NOT_SUPPORTED)

                if time.time() > end_time:
                    raise MQTTException('Daný modul neodpovedá.', status_code=mqtt_status.MQTT_ERR_UNKNOWN)
                result._condition.wait(1)

结果为MQTTMessageInfo对象,而_condition.wait(1)等待超时1秒。因此,基本上,当所有其他进程都在等待时,在1秒钟后将开始另一个while循环迭代,并检查是否已到达消息。

© www.soinside.com 2019 - 2024. All rights reserved.