使用 QoS=1 将数据摄取到 mosquitto 代理中

问题描述 投票:0回答:1

我正在尝试在 QoS=1 的情况下将数据摄取到本地机器上的 mosquitto docker 图像中。但是我可以看到订户卡在记录数 = 20 处。 对于我的演示应用程序,我每秒摄取 500 条记录,持续 20 秒。我可以看到订阅者的行为是不变的。

这是数据生成器代码。

非常感谢任何解决此问题的帮助。

发布者代码:


import paho.mqtt.client as paho 
from paho import mqtt
from random import randrange, uniform
import time
from time import sleep
from multiprocessing import Process
from os import getpid
import random
import datetime


def task():
    pid = getpid()
    machine_name =  str(pid)
    topic_name = "TEMP"

  
    for i in range(20) :

        mqttBroker ="localhost" 
        client = paho.Client("Temperature_Inside")
        client.connect(mqttBroker,1883) 

        for _ in range(500):
            sensor_reading = str(random.uniform(10, 12))
            ingest_time = str(datetime.datetime.now())
            sensor_payload = "{ 'Topic_Name': \'" + topic_name + "\', 'Msg_Id': \'" + str(i)+ '-'+ str(_) + "\', 'Ingestion_Time': \'" + ingest_time + "\', 'Machine_Name': \'" + machine_name + "\','Reading': \'"+ sensor_reading +"\' }"
            print(sensor_payload)
            client.publish(topic_name,sensor_payload,qos=1)
            
        sleep(1)



# entry point
if __name__ == '__main__':
    
    n=1
    for i in range(n):
        process = Process(target=task)
        process.start()


订阅号:

import paho.mqtt.client as paho 
from paho import mqtt
import time

def on_message(client, userdata, message):
    print("received message: " ,str(message.payload.decode("utf-8")))
    with open('files_sub/data_recv.txt','a+') as f:
         f.write("Message received: "  + str(message.payload) + "\n")

mqttBroker ="localhost"
client = paho.Client("Smartphone")
client.connect(mqttBroker,1883) 


client.loop_start()

client.subscribe("#",qos=1)
client.on_message=on_message 

time.sleep(300)
client.loop_stop()

这是订阅者的示例输出:

received message:  { 'Topic_Name': 'TEMP_1', 'Msg_Id': '0-0', 'Ingestion_Time': '2023-04-14 20:28:45.272753', 'Machine_Name': '89420','Reading': '11.245454110882422' }
received message:  { 'Topic_Name': 'TEMP_1', 'Msg_Id': '0-1', 'Ingestion_Time': '2023-04-14 20:28:45.272827', 'Machine_Name': '89420','Reading': '10.081152605834397' }
received message:  { 'Topic_Name': 'TEMP_1', 'Msg_Id': '0-2', 'Ingestion_Time': '2023-04-14 20:28:45.272855', 'Machine_Name': '89420','Reading': '11.220300354309748' }
received message:  { 'Topic_Name': 'TEMP_1', 'Msg_Id': '0-3', 'Ingestion_Time': '2023-04-14 20:28:45.272876', 'Machine_Name': '89420','Reading': '11.518400833393375' }
received message:  { 'Topic_Name': 'TEMP_1', 'Msg_Id': '0-4', 'Ingestion_Time': '2023-04-14 20:28:45.272892', 'Machine_Name': '89420','Reading': '10.812436901960156' }
received message:  { 'Topic_Name': 'TEMP_1', 'Msg_Id': '0-5', 'Ingestion_Time': '2023-04-14 20:28:45.272909', 'Machine_Name': '89420','Reading': '10.452335463113961' }
received message:  { 'Topic_Name': 'TEMP_1', 'Msg_Id': '0-6', 'Ingestion_Time': '2023-04-14 20:28:45.272927', 'Machine_Name': '89420','Reading': '11.169666283225068' }
received message:  { 'Topic_Name': 'TEMP_1', 'Msg_Id': '0-7', 'Ingestion_Time': '2023-04-14 20:28:45.272944', 'Machine_Name': '89420','Reading': '10.384115735731186' }
received message:  { 'Topic_Name': 'TEMP_1', 'Msg_Id': '0-8', 'Ingestion_Time': '2023-04-14 20:28:45.272962', 'Machine_Name': '89420','Reading': '10.713527094563638' }
received message:  { 'Topic_Name': 'TEMP_1', 'Msg_Id': '0-9', 'Ingestion_Time': '2023-04-14 20:28:45.272982', 'Machine_Name': '89420','Reading': '10.598582170350706' }
received message:  { 'Topic_Name': 'TEMP_1', 'Msg_Id': '0-10', 'Ingestion_Time': '2023-04-14 20:28:45.273001', 'Machine_Name': '89420','Reading': '11.361539934144364' }
received message:  { 'Topic_Name': 'TEMP_1', 'Msg_Id': '0-11', 'Ingestion_Time': '2023-04-14 20:28:45.273021', 'Machine_Name': '89420','Reading': '11.663632627831358' }
received message:  { 'Topic_Name': 'TEMP_1', 'Msg_Id': '0-12', 'Ingestion_Time': '2023-04-14 20:28:45.273044', 'Machine_Name': '89420','Reading': '10.65691717505538' }
received message:  { 'Topic_Name': 'TEMP_1', 'Msg_Id': '0-13', 'Ingestion_Time': '2023-04-14 20:28:45.273068', 'Machine_Name': '89420','Reading': '10.904640065408845' }
received message:  { 'Topic_Name': 'TEMP_1', 'Msg_Id': '0-14', 'Ingestion_Time': '2023-04-14 20:28:45.273089', 'Machine_Name': '89420','Reading': '11.203352658202002' }
received message:  { 'Topic_Name': 'TEMP_1', 'Msg_Id': '0-15', 'Ingestion_Time': '2023-04-14 20:28:45.273104', 'Machine_Name': '89420','Reading': '10.520332978869524' }
received message:  { 'Topic_Name': 'TEMP_1', 'Msg_Id': '0-16', 'Ingestion_Time': '2023-04-14 20:28:45.273122', 'Machine_Name': '89420','Reading': '10.897221818925265' }
received message:  { 'Topic_Name': 'TEMP_1', 'Msg_Id': '0-17', 'Ingestion_Time': '2023-04-14 20:28:45.273137', 'Machine_Name': '89420','Reading': '10.958659355435096' }
received message:  { 'Topic_Name': 'TEMP_1', 'Msg_Id': '0-18', 'Ingestion_Time': '2023-04-14 20:28:45.273154', 'Machine_Name': '89420','Reading': '10.143412660261575' }
received message:  { 'Topic_Name': 'TEMP_1', 'Msg_Id': '0-19', 'Ingestion_Time': '2023-04-14 20:28:45.273172', 'Machine_Name': '89420','Reading': '11.820377674321305' }
received message:  { 'Topic_Name': 'TEMP_1', 'Msg_Id': '1-0', 'Ingestion_Time': '2023-04-14 20:28:46.289744', 'Machine_Name': '89420','Reading': '11.29543750293708' }
received message:  { 'Topic_Name': 'TEMP_1', 'Msg_Id': '1-1', 'Ingestion_Time': '2023-04-14 20:28:46.290117', 'Machine_Name': '89420','Reading': '10.643004903851944' }
received message:  { 'Topic_Name': 'TEMP_1', 'Msg_Id': '1-2', 'Ingestion_Time': '2023-04-14 20:28:46.290295', 'Machine_Name': '89420','Reading': '10.458158806515144' }
received message:  { 'Topic_Name': 'TEMP_1', 'Msg_Id': '1-3', 'Ingestion_Time': '2023-04-14 20:28:46.290402', 'Machine_Name': '89420','Reading': '10.978263816038918' }
received message:  { 'Topic_Name': 'TEMP_1', 'Msg_Id': '1-4', 'Ingestion_Time': '2023-04-14 20:28:46.291308', 'Machine_Name': '89420','Reading': '10.391060321348153' }
received message:  { 'Topic_Name': 'TEMP_1', 'Msg_Id': '1-5', 'Ingestion_Time': '2023-04-14 20:28:46.291526', 'Machine_Name': '89420','Reading': '11.831071429545204' }
received message:  { 'Topic_Name': 'TEMP_1', 'Msg_Id': '1-6', 'Ingestion_Time': '2023-04-14 20:28:46.291618', 'Machine_Name': '89420','Reading': '10.905535280792117' }
received message:  { 'Topic_Name': 'TEMP_1', 'Msg_Id': '1-7', 'Ingestion_Time': '2023-04-14 20:28:46.291682', 'Machine_Name': '89420','Reading': '10.8712642203881' }
received message:  { 'Topic_Name': 'TEMP_1', 'Msg_Id': '1-8', 'Ingestion_Time': '2023-04-14 20:28:46.291735', 'Machine_Name': '89420','Reading': '10.736403240547105' }
received message:  { 'Topic_Name': 'TEMP_1', 'Msg_Id': '1-9', 'Ingestion_Time': '2023-04-14 20:28:46.291782', 'Machine_Name': '89420','Reading': '10.668622663674302' }
received message:  { 'Topic_Name': 'TEMP_1', 'Msg_Id': '1-10', 'Ingestion_Time': '2023-04-14 20:28:46.291842', 'Machine_Name': '89420','Reading': '10.008705910490459' }
received message:  { 'Topic_Name': 'TEMP_1', 'Msg_Id': '1-11', 'Ingestion_Time': '2023-04-14 20:28:46.291904', 'Machine_Name': '89420','Reading': '11.52470881947238' }
received message:  { 'Topic_Name': 'TEMP_1', 'Msg_Id': '1-12', 'Ingestion_Time': '2023-04-14 20:28:46.291962', 'Machine_Name': '89420','Reading': '11.478288851483171' }
received message:  { 'Topic_Name': 'TEMP_1', 'Msg_Id': '1-13', 'Ingestion_Time': '2023-04-14 20:28:46.292018', 'Machine_Name': '89420','Reading': '11.43596822016599' }
received message:  { 'Topic_Name': 'TEMP_1', 'Msg_Id': '1-14', 'Ingestion_Time': '2023-04-14 20:28:46.292073', 'Machine_Name': '89420','Reading': '10.07599401587021' }
received message:  { 'Topic_Name': 'TEMP_1', 'Msg_Id': '1-15', 'Ingestion_Time': '2023-04-14 20:28:46.292128', 'Machine_Name': '89420','Reading': '11.429841926223641' }
received message:  { 'Topic_Name': 'TEMP_1', 'Msg_Id': '1-16', 'Ingestion_Time': '2023-04-14 20:28:46.292184', 'Machine_Name': '89420','Reading': '11.975174671732594' }
received message:  { 'Topic_Name': 'TEMP_1', 'Msg_Id': '1-17', 'Ingestion_Time': '2023-04-14 20:28:46.292232', 'Machine_Name': '89420','Reading': '10.09898131578267' }
received message:  { 'Topic_Name': 'TEMP_1', 'Msg_Id': '1-18', 'Ingestion_Time': '2023-04-14 20:28:46.292274', 'Machine_Name': '89420','Reading': '10.582514581651441' }
received message:  { 'Topic_Name': 'TEMP_1', 'Msg_Id': '1-19', 'Ingestion_Time': '2023-04-14 20:28:46.292317', 'Machine_Name': '89420','Reading': '10.532774491896046' }
mqtt iot mosquitto paho
1个回答
0
投票

Rtiabata 萨哈,

PAHO Python MQTT 客户端实现了一个网络循环来处理网络流量并根据 QoS 计算消息的传递。您必须达到缓冲区限制,即在不调用网络循环的情况下可以发送多少消息,或者在没有客户端处理确认的情况下可以发送的 QoS 1 消息数量。

您可以选择如何管理“网络循环”。 参考:https://pypi.org/project/paho-mqtt/#network-loop。看起来最简单的入门方法是在连接后使用 loop_start() 来生成后台线程来异步处理网络循环。来自参考:

mqttc.connect("mqtt.eclipseprojects.io")
mqttc.loop_start()

while True:
    temperature = sensor.blocking_read()
    mqttc.publish("paho/temperature", temperature)
© www.soinside.com 2019 - 2024. All rights reserved.