我的目标是在物联网中某些模块之间的后端上正确处理MQTT消息。不幸的是隔离区,现在我无法使用模块(实际上在其他城市中)。因此,我决定实现模块仿真器类,该类将接收我的请求或发送响应。
第二个问题是,发布后我需要等待模块ACK或ERR。对于这个问题,我制作了ack_blocker列表,如下所示:
[
{
"module_mac": "mac",
"blocked": False,
"message": {}
},
{
"module_mac": "mac",
"blocked": False,
"message": {}
}
]
因此,当我向特定模块发送消息时,blocked属性将设置为True,并且在发布消息后我将在while循环中等待。另一方面,我发布的消息应该到达仿真器MQTT客户端,它将解析数据并响应ERR或ACK。在收到消息后,我将阻塞属性设置回False,循环将结束,并通过错误或正确的消息将消息返回到后端视图。
问题是,来自后端的已发布消息将永远不会到达仿真器MQTT客户端。 IDK为什么,但在我的循环中是超时(10s),在此时间之后,我应该抛出模块未响应的错误。我正在非常仔细地调试整个过程,当后端将要引发错误时,我的模拟器客户端最终将收到消息。我跑了更多次,每次都会完全一样。因此,我认为该循环以某种方式阻止了该消息的发送。
这是我的循环实现:
def send_message(self, mac: str, message: str):
self.publish(mac, message)
end_time = time.time() + self.timeout
while True:
module_ack_blocker = next(filter(lambda obj: obj.get('module_mac') == mac, self.ack_blocker), None)
if not module_ack_blocker.get('blocked'):
response = module_ack_blocker.get('message')
if response.get('result') == 'OK':
logging.getLogger('root_logger').info(f'[MQTT]: ACK Message received.')
return response
elif response.get('result') == 'ERROR':
raise MQTTException(response.get('details'), status_code=mqtt_status.MQTT_ERR_NOT_SUPPORTED)
if time.time() > end_time:
raise MQTTException('Module is not responding.', status_code=mqtt_status.MQTT_ERR_UNKNOWN)
如您所见,首先我发布消息。之后,我将计算超时,然后循环将开始。在循环中,我首先查看ack阻止程序列表中的正确字典(如我之前提到的)。我会问它是否没有被阻止。之后,如果我还有时间超时。
我的mqtt仿真器客户端看起来像这样:
class MqttClientEmulator(object):
def __init__(self):
self.app = None
self.broker_host = None
self.broker_port = None
self.keep_alive = None
self.timeout = None
self.client = mqtt.Client(client_id='brewmaster_client_emulator')
def init(self, app):
self.broker_host = os.getenv('BROKER_HOST')
self.broker_port = int(os.getenv('BROKER_PORT'))
self.keep_alive = int(os.getenv('MQTT_KEEPALIVE'))
self.timeout = int(os.getenv('MQTT_TIMEOUT'))
self.app = app
self.client.on_message = self.on_message
def on_message(self, client, userdata, msg):
topic = msg.topic
string_message = str(msg.payload.decode('utf-8'))
dict_message = json.loads(string_message)
# Request result
if dict_message.get('device_uuid'):
print(dict_message)
response = {
"module_mac": topic,
"sequence_number": 123,
"result": "OK",
"details": ""
}
time.sleep(1) # Just for time reserve (this code will be more complicated in future)
self.publish('brewmaster-backend', json.dumps(response))
def connect(self):
self.client.connect(self.broker_host, self.broker_port, self.keep_alive)
self.client.loop_start()
def disconnect(self):
self.client.loop_stop()
self.client.disconnect()
def subscribe(self, name):
self.client.subscribe(name)
def publish(self, topic, message):
self.client.publish(topic, message)
谢谢大家的帮助或想法。我也尝试过线程,也没有任何效果。如果您希望查看我项目的其他部分,请在评论中告诉我。谢谢。
确定,我需要更深入地研究paho MQTT库。对象MQTTMessageInfo有功能wait_for_publish。而且,如果您查看_condition对象,它已经实现了带信号量的锁。因此,我需要做的就是将我的MQTT客户端的send_message方法中的While循环更改为如下所示:
def send_message(self, mac: str, message: str):
result = self.publish(mac, message)
end_time = time.time() + self.timeout
if result.rc == mqtt.MQTT_ERR_QUEUE_SIZE:
raise ValueError('Message is not queued due to ERR_QUEUE_SIZE')
with result._condition:
while True:
module_ack_blocker = next(filter(lambda obj: obj.get('module_mac') == mac, self.ack_blocker), None)
if not module_ack_blocker.get('blocked'):
response = module_ack_blocker.get('message')
if response.get('result') == 'OK':
logging.getLogger('root_logger').info(f'[MQTT]: ACK Message received.')
return response
elif response.get('result') == 'ERROR':
raise MQTTException(response.get('details'), status_code=mqtt_status.MQTT_ERR_NOT_SUPPORTED)
if time.time() > end_time:
raise MQTTException('Daný modul neodpovedá.', status_code=mqtt_status.MQTT_ERR_UNKNOWN)
result._condition.wait(1)
结果为MQTTMessageInfo对象,而_condition.wait(1)等待超时1秒。因此,基本上,当所有其他进程都在等待时,在1秒钟后将开始另一个while循环迭代,并检查是否已到达消息。