通过python脚本将MQTT数据写入influxdb

问题描述 投票:0回答:0

我正在尝试使用 python 脚本将一些传感器数据写入 influxdb。

传感器正在发送数据,我的 Mosquitto 代理正在接收数据:

pi@raspberrypi:~ $ mosquitto_sub -u pi -P ******* -v -t '#'
insideroom/temperature temp,site=PrintingRoom value=20.83
insideroom/humidity humidity,site=PrintingRoom value=59.90

我一直在关注这个指南:https://diyi0t.com/visualize-mqtt-data-with-influxdb-and-grafana/ 并修改了他们使用的脚本,因为我的 MQTT 主题不同。

这是脚本的输出:

pi@raspberrypi:~ $ python3 room-monitor-grafana-v2.py
MQTT to InfluxDB bridge
Connected with result code 0
Connected to Broker
insideroom/temperature b'temp,site=PrintingRoom value=20.32'
<re.Match object; span=(0, 22), match='insideroom/temperature'>
Traceback (most recent call last):
  File "/home/pi/room-monitor-grafana-v2.py", line 100, in <module>
    main()
  File "/home/pi/room-monitor-grafana-v2.py", line 95, in main
    mqtt_client.loop_forever()
  File "/home/pi/.local/lib/python3.9/site-packages/paho/mqtt/client.py", line 1756, in loop_forever
    rc = self._loop(timeout)
  File "/home/pi/.local/lib/python3.9/site-packages/paho/mqtt/client.py", line 1164, in _loop
    rc = self.loop_read()
  File "/home/pi/.local/lib/python3.9/site-packages/paho/mqtt/client.py", line 1556, in loop_read
    rc = self._packet_read()
  File "/home/pi/.local/lib/python3.9/site-packages/paho/mqtt/client.py", line 2439, in _packet_read
    rc = self._packet_handle()
  File "/home/pi/.local/lib/python3.9/site-packages/paho/mqtt/client.py", line 3033, in _packet_handle
    return self._handle_publish()
  File "/home/pi/.local/lib/python3.9/site-packages/paho/mqtt/client.py", line 3327, in _handle_publish
    self._handle_on_message(message)
  File "/home/pi/.local/lib/python3.9/site-packages/paho/mqtt/client.py", line 3570, in _handle_on_message
    on_message(self, self._userdata, message)
  File "/home/pi/room-monitor-grafana-v2.py", line 45, in on_message
    sensor_data = _parse_mqtt_message(msg.topic, msg.payload.decode('utf-8'))
  File "/home/pi/room-monitor-grafana-v2.py", line 56, in _parse_mqtt_message
    measurement = match.group(2)
IndexError: no such group

这是脚本:

#!/usr/bin/env python3

"""A MQTT to InfluxDB Bridge
This script receives MQTT data and saves those to InfluxDB.
"""

import re
from typing import NamedTuple

import paho.mqtt.client as mqtt
from influxdb import InfluxDBClient

INFLUXDB_ADDRESS = '*****'
INFLUXDB_USER = '******'
INFLUXDB_PASSWORD = '******'
INFLUXDB_DATABASE = '*******'

MQTT_ADDRESS = '*******'
MQTT_USER = 'pi'
MQTT_PASSWORD = '*******'
MQTT_TOPIC = 'insideroom/#'  
MQTT_REGEX = 'insideroom/([^/]+)'

influxdb_client = InfluxDBClient(INFLUXDB_ADDRESS, 8086, None)


class SensorData(NamedTuple):
    location: str
    measurement: str
    value: float


def on_connect(client, userdata, flags, rc):
    """ The callback for when the client receives a CONNACK response from the server."""
    print('Connected with result code ' + str(rc))
    if rc ==0:
        print("Connected to Broker")
        client.subscribe('insideroom/humidity')
        client.subscribe('insideroom/temperature')


def on_message(client, userdata, msg):
    """The callback for when a PUBLISH message is received from the server."""
    print(msg.topic + ' ' + str(msg.payload))
    sensor_data = _parse_mqtt_message(msg.topic, msg.payload.decode('utf-8'))
    print(sensor_data)
    if sensor_data is not None:
        _send_sensor_data_to_influxdb(sensor_data)


def _parse_mqtt_message(topic, payload):
    match = re.match(MQTT_REGEX, topic)
    print(match)
    if match:
        location = match.group(1)
        measurement = match.group(2)
        if measurement == 'status':
            return None
        return SensorData(location, float(payload))
    else:
        return None


def _send_sensor_data_to_influxdb(sensor_data):
    json_body = [
        {
            'measurement': sensor_data.measurement,
            'tags': {
                'location': sensor_data.location
            },
            'fields': {
                'value': sensor_data.value
            }
        }
    ]
    influxdb_client.write_points(json_body)


def _init_influxdb_database():
    databases = influxdb_client.get_list_database()
    if len(list(filter(lambda x: x['name'] == INFLUXDB_DATABASE, databases))) == 0:
        influxdb_client.create_database(INFLUXDB_DATABASE)
    influxdb_client.switch_database(INFLUXDB_DATABASE)


def main():
    _init_influxdb_database()

    mqtt_client = mqtt.Client()
    mqtt_client.username_pw_set(MQTT_USER, MQTT_PASSWORD)
    mqtt_client.on_connect = on_connect
    mqtt_client.on_message = on_message

    mqtt_client.connect(MQTT_ADDRESS, 1883)
    mqtt_client.loop_forever()


if __name__ == '__main__':
    print('MQTT to InfluxDB bridge')
    main()

我需要通过函数_parse_mqtt_message传递消息还是可以直接写入influx?如果您有任何消息来源可以为我指明正确的方向,那也将不胜感激!

python raspberry-pi mqtt influxdb
© www.soinside.com 2019 - 2024. All rights reserved.