MQTT和Postgresql数据库:使用python将mqtt消息插入到postgresql表的列中

问题描述 投票:0回答:2

我是PostgreSQL的新手,我想使用pysycopg2将mqtt消息插入PostgreSQL数据库。不幸的是,它没有按预期工作。我认为这是一个简单的错误,但无法弄清楚到底是什么错误。首先,我使用python脚本[1]在mosquitto代理中发布了mqtt消息,然后从另一个脚本[2]进行了订阅,并尝试存储到postgresql中。相应的错误消息显示在[3]中。

这是我的Publisher脚本,用于将伪造的mqtt-json数据发布到mosquitto经纪人:

#!/usr/bin/python

import paho.mqtt.client as mqtt
import numpy as np
import time

broker_address = "localhost"

def on_connect(client, userdata, flags, rc):
  print("Connected with result code " + str(rc))

client = mqtt.Client()
client.on_connect = on_connect
client.connect(broker_address,1883,60)
client.loop_start()

while True:
  time.sleep(0.05)
  degrees = np.random.random_sample()
  toa = np.random.random_sample()
  humidity = np.random.random_sample()
  json =  ('''[{"time": "2020-04-01 21:00:00",  "device_addr": "buizg8b8",  "FCntUp": "7281",   "CF":"867900000",   "BW":"125000",  "SF":"10",  "RSSI":"-121","SNR": "-14", "sec":"123564574567", "nsec":   "245244546", "offset":"4184",   "Uncertainty": "7816",  "Offset Uncertainty":"201.17"   ,"device EUI":"ruzfv276gz2v23g", "Id":"0" ,     "Latitude": "30.347834" , "Longitude":"20.34763",   " Altitude":"500","MIC":"87hiub87"}]''')

  locpk= '{"mote":"niwuinnwe","frame_cnt":2000,"hz":902700000,"bandwidth":125000,"sf":7,"antenna":0,"sec":1235346545645,"nsec":245245245,"rssi_db":-93,"snr_db":10.0,"o_hz":14994,"u_nsec":5.6,"u_hz":1.16,"lat":51.120052,"lon":-114.041752,"alt":1061,"device_id":"3f3g3g354bv42rr4rg"}'

  locpk= str(locpk)
  json= str(json)
  client.publish("device14/geo", locpk, 1, 1)
  client.publish("device14/geo", json, 1, 1)

这是我的订阅者脚本,用于订阅已发布的消息并插入PostgreSQL:

#!/usr/bin/python
import psycopg2
from psycopg2 import connect, Error
from config import config
import paho.mqtt.client as mqtt
import datetime
import time

def on_connect(client, userdata, flags, rc):
    print("Connected with result code "+str(rc))
    client.subscribe("device14/geo",0)

def on_message(client, userdata, msg):
    Date = datetime.datetime.utcnow()
    message= msg.payload.decode()
    try:
            #print the JSON Message with Data and Topic
            print(str(Date) + ": " + msg.topic + " " + str(message))
            #concatenate the SQL string
            sql_string = "INSERT INTO table_name(column_name)\nVALUES %s" % (message)
            #execute the INSERT statement
            cur = conn.cursor()
            cur.execute(sql_string)
            #commit the changes to the database
            conn.commit()
            print("Finished writing to PostgreSQL")
    except (Exception, Error) as err:
            print("\npsycopg2 connect error:", err)
            #print("Could not insert " + message + " into Postgresql DB")

#Set up a client for Postgresql DB
try:
    #read connection parameters
    params = config()
    #connect to the PostgreSQL server
    print('Connecting to the PostgreSQL database...')
    conn = psycopg2.connect(**params)
    #create a cursor
    cur = conn.cursor()
    #execute a statement
    print('PostgreSQL database version:')
    cur.execute('SELECT version()')
    cur.execute(sql)
    #display the PostgreSQL database server version
    db_version = cur.fetchone()

    print(db_version)

except (Exception, psycopg2.DatabaseError) as error:
    print(error)
#Initialize the MQTT client that should connect to the Mosquitto broker
client = mqtt.Client()

#set last will message
client.will_set('Postgresql_Paho-Client/lastwill','Last will message', 1, True)

client.on_connect = on_connect
client.on_message = on_message
connOK=False
while(connOK == False):
    try:
        client.connect("localhost", 1883, 60)
        connOK = True
    except:
        connOK = False
    time.sleep(2)
#Blocking loop to the Mosquitto broker
client.loop_forever()

错误:

/home/osboxes/postgresql/bin/python /home/osboxes/PycharmProjects/postgresql/geo_store.py


Connecting to the PostgreSQL database...

PostgreSQL database version:

no results to fetch

Connected with result code 0

Received a message on topic: device14/geo


2020-04-10 15:18:00.336002: device14/geo [{"time": "2020-04-01 21:00:00",   "device_addr": "buizg8b8",  "FCntUp": "7281",   "CF":"867900000",   "BW":"125000",  "SF":"10",  "RSSI":"-121","SNR": "-14", "sec":"123564574567", "nsec":   "245244546", "offset":"4184",   "Uncertainty": "7816",  "Offset Uncertainty":"201.17"   ,"device EUI":"ruzfv276gz2v23g", "Id":"0" ,     "Latitude": "30.347834" , "Longitude":"20.34763",   " Altitude":"500","MIC":"87hiub87"}]


psycopg2 connect error: syntax error at or near "["
LINE 2: VALUES [{"time": "2020-04-05 21:00:00", "device_addr": "buizg...
               ^**


Received a message on topic: device14/geo

2020-04-10 15:18:00.366786: device14/geo {"mote":"niwuinnwe","frame_cnt":2000,"hz":902700000,"bandwidth":125000,"sf":7,"antenna":0,"sec":1235346545645,"nsec":245245245,"rssi_db":-93,"snr_db":10.0,"o_hz":14994,"u_nsec":5.6,"u_hz":1.16,"lat":51.120052,"lon":-114.041752,"alt":1061,"device_id":"3f3g3g354bv42rr4rg"}

psycopg2 connect error: syntax error at or near "{"
LINE 2: VALUES {"mote":"niwuinnwe","frame_cnt":2000,"hz":902700000...

期待您的评论。任何帮助将不胜感激。

PS:我也曾尝试更改发布消息的结构(即locpk,json),但没有帮助。如果您对发布消息的结构有任何建议,请告诉我。我会尝试的。

python postgresql mqtt psycopg2 paho
2个回答
0
投票

我没有看到table_name的表结构,但是如果它只有一列(column_name),并且想在其中存储JSON文档,则需要在PostgreSQL中将其定义为jsonb。在这样的列中插入数据很容易:

from psycopg2.extras import Json
...
query = "INSERT INTO table_name(column_name) VALUES (%s)"
data = (Json(message),)
cur.execute(query, data)
conn.commit()
...

但是,对整个消息使用单个列并不是一个好的设计选择。为time, device_addr, latitude, longitude, altitude等通用键创建列(我只是根据提供的数据在这里猜测)。将不太重要(可能会丢失)的密钥存储在单独的jsonb列中(例如,称为data)。


0
投票
  1. 使用sqlalchemy工具解决了先前的'INSERT'问题首先,非常感谢您的建议。我发现了使用sqlalchemy工具的另一种解决方案。到目前为止,还算不错。但是我也尝试了您的建议,但是没有按预期工作。当我尝试使用sqlalchemy而不是psycopg2游标进行相同操作时,它可以工作。我不知道为什么正常的psycopg2光标不起作用(如您建议的那样)。它不会同时显示任何错误,也不会在创建的表中插入任何消息。在下面的代码中,尽管我尝试了它们,但我已经注释掉了您的上述建议(在“ on_message”功能块下)。没工作!

PS:表结构显示在'create_table'功能块下(供您参考)。

  1. 新问题:“在插入操作期间/以及插入操作期间自动生成主键(id)”当我将id列设置为主键时,insert函数无法插入其他列的行,因为它显示'id'列的not null约束为null,这是一个错误。如何使其与我们执行的每个插入操作一起自动生成。因此不会显示“ id”列为null的错误(即在约束下保持NOT NULL)。有关此问题,请参考表'toa2'。

  2. 另外,我正在此JSON有效负载中处理GPS时间,当我使用abstime或任何时间数据类型(用于列)时,它显示“值超出范围”错误! sqlalchemy工具建议使用不同的日期样式。有谁知道哪种风格适合GPS时间?我在PostgreSQL DB中尝试了所有与时间相关的类型。无效,因此请使用“字符变化”数据类型。

这是我更新的订户代码:

#!/usr/bin/python

import psycopg2
from psycopg2 import connect, Error
from config import config
import paho.mqtt.client as mqtt
import datetime
import time
import json
import pandas as pd
from psycopg2.extras import Json
from sqlalchemy import create_engine
engine_nf = create_engine('postgresql+psycopg2://postgres:postgres@localhost:5432/test_db')
sql_read = lambda sql: pd.read_sql(sql, engine_nf)
sql_execute = lambda sql: pd.io.sql.execute(sql, engine_nf)


def connect():
    """ Connect to the PostgreSQL database server """
    conn = None
    try:
        # read connection parameters
        params = config()
        # connect to the PostgreSQL server
        print('Connecting to the PostgreSQL database...')
        conn = psycopg2.connect(**params)
        # create a cursor
        cur = conn.cursor()
        # execute a statement
        print('PostgreSQL database version:')
        cur.execute('SELECT version()')
        # display the PostgreSQL database server version
        db_version = cur.fetchone()
        print(db_version)
        # close the communication with the PostgreSQL
        #cur.close()
    except (Exception, psycopg2.DatabaseError) as error:
        print(error)
    '''finally:
        if conn is not None:
            conn.close()
            print('Database connection closed.')'''

def create_table():
    """ create tables in the PostgreSQL database"""
    commands = (
        '''
        CREATE TABLE if not exists geoloc (
                id integer NOT NULL,
                "time" abstime,
                topic character varying,
                gateway_id character varying,
                device_address character varying,
                locpk json NOT NULL,
                PRIMARY KEY (id)
        )
        ''',
        '''
        CREATE TABLE if not exists toa(
                id integer,
                time character varying,
                device_address character varying,
                toa_sec character varying,
                toa_nsec character varying,
                frequency_offset character varying,
                toa_uncertainty character varying,
                frequency_offset_uncertainty character varying ,
                gateway_eui character varying(255) ,
                antenna_id character varying ,
                gateway_latitude character varying ,
                gateway_longitude character varying ,
                gateway_altitude character varying ,
                mic character varying
            )
        ''',
        '''
        CREATE TABLE if not exists toa2(
                id integer,
                time character varying,
                device_address character varying,
                toa_sec character varying,
                toa_nsec character varying,
                frequency_offset character varying,
                toa_uncertainty character varying,
                frequency_offset_uncertainty character varying ,
                gateway_eui character varying(255) ,
                antenna_id character varying ,
                gateway_latitude character varying ,
                gateway_longitude character varying ,
                gateway_altitude character varying ,
                mic character varying,
                PRIMARY KEY (id)
            )
        '''
    )
    conn = None
    try:
        # read the connection parameters
        params = config()
        # connect to the PostgreSQL server
        conn = psycopg2.connect(**params)
        cur = conn.cursor()
        # create table one by one
        for command in commands:
            cur.execute(command)
        # close communication with the PostgreSQL database server
        cur.close()
        # commit the changes
        conn.commit()
        print('tables created and commited')
    except (Exception, psycopg2.DatabaseError) as error:
        print(error)
    finally:
        if conn is not None:
            conn.close()

def insert(query, data):
    """ insert a new vendor into the vendors table """
    conn = None
    id = None
    try:
        # read database configuration
        params = config()
        # connect to the PostgreSQL database
        conn = psycopg2.connect(**params)
        # create a new cursor
        cur = conn.cursor()
        # execute the INSERT statement
        cur.execute(query, data)
        # get the generated id back
        id = cur.fetchone()[0]
        # commit the changes to the database
        conn.commit()
        # close communication with the database
        cur.close()
    except (Exception, psycopg2.DatabaseError) as error:
        print(error)
    finally:
        if conn is not None:
            conn.close()

    return id

def get_insert(query):
    """ query data from the vendors table """
    conn = None
    try:
        params = config()
        conn = psycopg2.connect(**params)
        cur = conn.cursor()
        cur.execute(query)
        print("The number of rows: ", cur.rowcount)
        row = cur.fetchone()
        while row is not None:
            print(row)
            row = cur.fetchone()
        # filter device_address & gateway_id from 'locpk' column & store it in another table
        cur.execute("select locpk->>'time',locpk->>'device_addr', locpk->>'TOA sec', locpk->>'TOA nsec',locpk->>'Gateway EUI', locpk->>'Gateway_Latitude', locpk->>'Gateway Longitude', locpk->>'Gateway Altitude' from public.geoloc")
        geo= cur.fetchone()
        insert= "INSERT INTO toa(time, device_address, toa_sec, toa_nsec, gateway_eui, gateway_latitude, gateway_longitude, gateway_altitude) VALUES('%s', '%s', '%s','%s','%s','%s','%s','%s')" % (geo)
        sql_execute(insert)
        print('inserted device_address')
        print('inserted gateway_id')
        cur.close()
    except (Exception, psycopg2.DatabaseError) as error:
        print(error)
    finally:
        if conn is not None:
            conn.close()

def on_connect(client, userdata, flags, rc):
    print("Connected with result code "+str(rc))
    client.subscribe("gateway14/geolocation",0)

def on_message(client, userdata, msg):
    print("Received a message on topic: " + msg.topic)
    Date = datetime.datetime.utcnow()
    message= msg.payload.decode()
    try:
            # print the JSON Messagewith Data and Topic
            print(str(Date) + ": " + msg.topic + " " + str(message))
            # concatenate the SQL string
            data= json.dumps(message)
            #data = (Json(message),)
            #query= "INSERT INTO geoloc(locpk) VALUES (%s)"
            #insert(query, data)
            print('json_data:',data)
            insert = '''
            INSERT INTO geoloc(time,topic,locpk)
            VALUES ('%s','%s','%s')
            ''' % (Date,msg.topic,data)
            sql_execute(insert)
            print("Finished writing to PostgreSQL")
            query= " SELECT time, topic, locpk FROM geoloc ORDER BY time LIMIT 1"
            get_insert(query)
    except (Exception, Error) as err:
            print("\npsycopg2 connect error:", err)
            print("Could not insert " + message + " into Postgresql DB")

# Set up a client for Postgresql DB
connect()
create_table()
# Initialize the MQTT client that should connect to the Mosquitto broker
client = mqtt.Client()

#set last will message
client.will_set('Postgresql_Paho-Client/lastwill','Last will message', 1, True)

client.on_connect = on_connect
client.on_message = on_message

connOK=False
while(connOK == False):
    try:
        client.connect("localhost", 1883, 60)
        connOK = True
    except:
        connOK = False
    time.sleep(2)

# Blocking loop to the Mosquitto broker
client.loop_forever()
 **Error:**

/home/osboxes/postgresql/bin/python /home/osboxes/PycharmProjects/postgresql/toa.py
Connecting to the PostgreSQL database...
PostgreSQL database version:
('PostgreSQL 11.7 (Ubuntu 11.7-2.pgdg18.04+1) on x86_64-pc-linux-gnu, compiled by gcc (Ubuntu 7.4.0-1ubuntu1~18.04.1) 7.4.0, 64-bit',)
tables created and commited
Connected with result code 0
Received a message on topic: gateway14/geolocation
2020-04-12 20:20:30.372823: gateway14/geolocation {"time": "2020-04-05 21:48:07",   "device_addr": "34t324t2",  "FCntUp": "3852",   "CF":"867900000",   "BW":"125000",  "SF":"10",  "RSSI":"-121","SNR": "-14", "TOA sec":"1270151304", "TOA nsec": "744816566", "Frequency offset":"4184", "TOA Uncertainty": "7816",  "Frequency Offset Uncertainty":"201.17" ,"Gateway EUI":"wijfbwurzwr34B", "Antenna Id":"0" ,     "Gateway Latitude": "42.795888" , "Gateway Longitude":"32.118123",  " Gateway Altitude":"486","MIC":"69a6af7b"}
Finished writing to PostgreSQL
The number of rows:  1
('2020-04-11 00:39:45+02', 'gateway14/geolocation', '[{"time": "2020-04-05 21:48:07",\t"device_addr": "34t324t2",\t"FCntUp": "3852",\t"CF":"867900000",\t"BW":"125000",\t"SF":"10",\t"RSSI":"-121","SNR": "-14", "TOA sec":"1270151304", "TOA nsec":\t"744816566", "Frequency offset":"4184",\t"TOA Uncertainty": "7816",\t"Frequency Offset Uncertainty":"201.17"\t,"Gateway EUI":"wijfbwurzwr34B", "Antenna Id":"0" , \t"Gateway Latitude": "42.795888"\t, "Gateway Longitude":"32.118123",\t" Gateway Altitude":"486","MIC":"69a6af7b"}]')
(psycopg2.errors.NotNullViolation) null value in column "id" violates not-null constraint
DETAIL:  Failing row contains (null, 2020-04-05 21:48:07, 34t324t2, 1270151304, 744816566, null, null, null, wijfbwurzwr34B, null, None, 32.118123, None, null).

[SQL: INSERT INTO toa2(time, device_address, toa_sec, toa_nsec, gateway_eui, gateway_latitude, gateway_longitude, gateway_altitude) VALUES('2020-04-05 21:48:07', '34t324t2', '1270151304','744816566','wijfbwurzwr34B','None','32.118123','None')]
(Background on this error at: http://sqlalche.me/e/gkpj)

任何建议将不胜感激!

非常感谢您的帮助。

最好的问候,唐兹

© www.soinside.com 2019 - 2024. All rights reserved.