我的脚本从 MQTT 服务器读取数据并将其写入 postgres 表。
我正在使用
loop_forever()
。
该程序应该不间断运行。
收到第一个连接时,一切正常,但一段时间后(从几分钟到几天)
on_connect()
会再次被调用。程序可以运行(意味着连接没有错误),但不再收到任何消息。
为了调试,我尝试了以下操作:
client.disconnect()
令我惊讶的是,第一件事和第二件事什么也没做 - 没有关于新连接的日志,并且正在运行的程序在连接恢复后继续运行。
第三次尝试没有成功,我没能成功。
其他备注:
loop_start()
代替loop_forever
,但根本没有成功所以基本上问题是:
on_connect
(并丢失传入数据)我的代码:
import json
import sys
from paho.mqtt import client as mqtt_client
import psycopg2
import logging as log
from datetime import datetime
import certifi
from collections import defaultdict
def connect_mqtt(userdict) -> mqtt_client:
def on_connect(client, userdata, flags, rc):
log.info(f"{datetime.now()}: Trying connect")
if rc == 0:
log.info(f"{datetime.now()}: Connection returned result: " + mqtt_client.connack_string(rc))
else:
log.info("Failed to connect, return code %d\n", rc)
client = mqtt_client.Client(client_id=conf['client_id'], protocol=mqtt_client.MQTTv31, userdata=userdict)
client.tls_set(certifi.where())
client.tls_insecure_set(True)
client.username_pw_set(conf['username'], conf['password'])
client.on_connect = on_connect
client.connect(conf['broker'], conf['port'])
return client
def on_message(client, userdata, msg):
now_ts_in_s = round(datetime.timestamp(datetime.now()))
now_dt_in_s = datetime.fromtimestamp(now_ts_in_s)
try:
value = float(msg.payload.decode())
data = [now_dt_in_s, value]
insert_to_psql(userdata['conn'], data)
except ValueError:
pass
def insert_to_psql(conn, data):
cursor = conn.cursor()
insert_query = "INSERT INTO data (time, value) VALUES (%s, %s) ON CONFLICT " \
"DO " \
"NOTHING;"
cursor.execute(insert_query, data)
conn.commit()
def run():
psql_conn = "postgres://postgres:blablabla"
conn = psycopg2.connect(psql_conn)
userdict = {'collected_data': defaultdict(list), 'conn': conn, 'first_conn': True}
client = connect_mqtt(userdict)
client.subscribe(conf['topic'])
client.on_message = on_message
try:
client.loop_forever()
finally:
client.disconnect()
conn.close()
if __name__ == '__main__':
with open(sys.argv[1]) as f:
conf = json.load(f)
run()
如果调用了connect,则可能会在此之前调用disconnect。可能是一些临时的网络问题。您应该配置相应的回调。
请注意,正因为如此,最重要的是您在 on_connect 回调中订阅,而不是在该回调之外订阅。当 paho 断开连接并再次连接时,它不会自动重新订阅。这就是为什么应该在 on_connect 回调中进行订阅的原因。
对于您询问如何测试这一点。您可以运行本地 MQTT 代理,只需在您的应用程序第一次连接后将其关闭,然后再次启动即可。
除此之外,如果您相应地配置代理,消息就不会丢失。 MQTT 有针对特定目的的各种 QOS 设置。
如果您认为您的应用程序有问题,并且不是网络问题,您可以通过多次部署您的应用程序并让副本通过共享订阅进行订阅来获得更可靠的设置。 https://www.hivemq.com/blog/mqtt5-essentials-part7-shared-subscriptions/
就我而言,我在客户端中不断收到“on_connect”事件的原因是因为我已经通过相同的
client_id
在第二个进程中进行了连接,因此 Paho MQTT 服务器交替地一遍又一遍地断开第一个或第二个客户端的连接,每 2 秒一次,因为他们不断连接同一个 client_id
。