如何为可以发送和接收消息的mqtt客户端实现多线程处理

问题描述 投票:1回答:2

我正在设置一个python MQTT客户端,该客户端应该接收特定主题的消息并将消息发布到两个不同的主题。如果接收到消息,则客户端应将主题为1的消息发送给MQTT代理。

此外,客户端应每2秒向代理发送一条主题为2的消息。

我认为我必须实现多线程,对吗?到目前为止,这是我的代码:

 #!/usr/bin/env python
import time
import paho.mqtt.client as mqtt
import socket
import json
import requests
from configparser import SafeConfigParser
from threading import Timer

def on_connect(client, userdata, flags, rc):
    print("CONNECTED")
    print("Connected with result code: ", str(rc))
    print("subscribing to topics")
    client.subscribe(mqtt_sub_topics)

def on_message(client, userdata, message):
    print("Data requested")
    client.publish(mqtt_pub_topic_control,json.dumps(msg))

def main():
    print("WAIT for max: ",delay)
    while True:
        time.sleep(delay)
        client.publish(mqtt_pub_topic_state,json.dumps(msg))

### INIT ###
........

### MQTT ###
client = mqtt.Client(hostname)
client.on_connect = on_connect
client.on_message = on_message
client.on_disconnect = on_disconnect
client.connect(mqtt_broker, mqtt_port)
client.loop_start()

### Start MAIN ###
main()

关于我的代码,我有几个问题:

实施的time.sleep(delay)延迟是否存在问题?使用计时器更好吗?是否可以同时接收多个消息?

python python-3.x multithreading mqtt paho
2个回答
2
投票

我会很清楚地回答MQTT问题。

MQTT客户端是单线程的,它一次只能接收和处理一条消息,如果要并行处理多条消息,则需要拥有自己的线程池并使用on_message函数进行切换传入消息到要处理的池中。


0
投票

这里是每个发布者和订阅者的带线程代码

import threading
import time
import paho.mqtt.client as mqtt

import json
topic="data"
broker="test.mosquitto.org"
port=1883
def on_connect(client, userdata, flags, rc):
    print("CONNECTED")
    print("Connected with result code: ", str(rc))
    client.subscribe("data")
    print("subscribing to topic : "+topic)


def on_message(client, userdata, message):
    print("Data requested "+str(message.payload))


def main():
    print("WAIT for max: ",2)
    while True:
        time.sleep(1)
        client.publish(topic,"dfdfd")

### MQTT ###
client = mqtt.Client()
client.connect(broker, port)
client.on_connect = on_connect

#client.on_disconnect = on_disconnect
def subscribing():
    client.on_message = on_message
    client.loop_forever()
sub=threading.Thread(target=subscribing)
pub=threading.Thread(target=main)

### Start MAIN ###

sub.start()
pub.start()
© www.soinside.com 2019 - 2024. All rights reserved.