我正在设置一个python MQTT客户端,该客户端应该接收特定主题的消息并将消息发布到两个不同的主题。如果接收到消息,则客户端应将主题为1的消息发送给MQTT代理。
此外,客户端应每2秒向代理发送一条主题为2的消息。
我认为我必须实现多线程,对吗?到目前为止,这是我的代码:
#!/usr/bin/env python
import time
import paho.mqtt.client as mqtt
import socket
import json
import requests
from configparser import SafeConfigParser
from threading import Timer
def on_connect(client, userdata, flags, rc):
print("CONNECTED")
print("Connected with result code: ", str(rc))
print("subscribing to topics")
client.subscribe(mqtt_sub_topics)
def on_message(client, userdata, message):
print("Data requested")
client.publish(mqtt_pub_topic_control,json.dumps(msg))
def main():
print("WAIT for max: ",delay)
while True:
time.sleep(delay)
client.publish(mqtt_pub_topic_state,json.dumps(msg))
### INIT ###
........
### MQTT ###
client = mqtt.Client(hostname)
client.on_connect = on_connect
client.on_message = on_message
client.on_disconnect = on_disconnect
client.connect(mqtt_broker, mqtt_port)
client.loop_start()
### Start MAIN ###
main()
关于我的代码,我有几个问题:
实施的time.sleep(delay)
延迟是否存在问题?使用计时器更好吗?是否可以同时接收多个消息?
我会很清楚地回答MQTT问题。
MQTT客户端是单线程的,它一次只能接收和处理一条消息,如果要并行处理多条消息,则需要拥有自己的线程池并使用on_message
函数进行切换传入消息到要处理的池中。
这里是每个发布者和订阅者的带线程代码
import threading
import time
import paho.mqtt.client as mqtt
import json
topic="data"
broker="test.mosquitto.org"
port=1883
def on_connect(client, userdata, flags, rc):
print("CONNECTED")
print("Connected with result code: ", str(rc))
client.subscribe("data")
print("subscribing to topic : "+topic)
def on_message(client, userdata, message):
print("Data requested "+str(message.payload))
def main():
print("WAIT for max: ",2)
while True:
time.sleep(1)
client.publish(topic,"dfdfd")
### MQTT ###
client = mqtt.Client()
client.connect(broker, port)
client.on_connect = on_connect
#client.on_disconnect = on_disconnect
def subscribing():
client.on_message = on_message
client.loop_forever()
sub=threading.Thread(target=subscribing)
pub=threading.Thread(target=main)
### Start MAIN ###
sub.start()
pub.start()