我正在尝试使用 python 脚本将一些传感器数据写入 influxdb。
传感器正在发送数据,我的 Mosquitto 代理正在接收数据:
pi@raspberrypi:~ $ mosquitto_sub -u pi -P ******* -v -t '#'
insideroom/temperature temp,site=PrintingRoom value=20.83
insideroom/humidity humidity,site=PrintingRoom value=59.90
我一直在关注这个指南:https://diyi0t.com/visualize-mqtt-data-with-influxdb-and-grafana/ 并修改了他们使用的脚本,因为我的 MQTT 主题不同。
这是脚本的输出:
pi@raspberrypi:~ $ python3 room-monitor-grafana-v2.py
MQTT to InfluxDB bridge
Connected with result code 0
Connected to Broker
insideroom/temperature b'temp,site=PrintingRoom value=20.32'
<re.Match object; span=(0, 22), match='insideroom/temperature'>
Traceback (most recent call last):
File "/home/pi/room-monitor-grafana-v2.py", line 100, in <module>
main()
File "/home/pi/room-monitor-grafana-v2.py", line 95, in main
mqtt_client.loop_forever()
File "/home/pi/.local/lib/python3.9/site-packages/paho/mqtt/client.py", line 1756, in loop_forever
rc = self._loop(timeout)
File "/home/pi/.local/lib/python3.9/site-packages/paho/mqtt/client.py", line 1164, in _loop
rc = self.loop_read()
File "/home/pi/.local/lib/python3.9/site-packages/paho/mqtt/client.py", line 1556, in loop_read
rc = self._packet_read()
File "/home/pi/.local/lib/python3.9/site-packages/paho/mqtt/client.py", line 2439, in _packet_read
rc = self._packet_handle()
File "/home/pi/.local/lib/python3.9/site-packages/paho/mqtt/client.py", line 3033, in _packet_handle
return self._handle_publish()
File "/home/pi/.local/lib/python3.9/site-packages/paho/mqtt/client.py", line 3327, in _handle_publish
self._handle_on_message(message)
File "/home/pi/.local/lib/python3.9/site-packages/paho/mqtt/client.py", line 3570, in _handle_on_message
on_message(self, self._userdata, message)
File "/home/pi/room-monitor-grafana-v2.py", line 45, in on_message
sensor_data = _parse_mqtt_message(msg.topic, msg.payload.decode('utf-8'))
File "/home/pi/room-monitor-grafana-v2.py", line 56, in _parse_mqtt_message
measurement = match.group(2)
IndexError: no such group
这是脚本:
#!/usr/bin/env python3
"""A MQTT to InfluxDB Bridge
This script receives MQTT data and saves those to InfluxDB.
"""
import re
from typing import NamedTuple
import paho.mqtt.client as mqtt
from influxdb import InfluxDBClient
INFLUXDB_ADDRESS = '*****'
INFLUXDB_USER = '******'
INFLUXDB_PASSWORD = '******'
INFLUXDB_DATABASE = '*******'
MQTT_ADDRESS = '*******'
MQTT_USER = 'pi'
MQTT_PASSWORD = '*******'
MQTT_TOPIC = 'insideroom/#'
MQTT_REGEX = 'insideroom/([^/]+)'
influxdb_client = InfluxDBClient(INFLUXDB_ADDRESS, 8086, None)
class SensorData(NamedTuple):
location: str
measurement: str
value: float
def on_connect(client, userdata, flags, rc):
""" The callback for when the client receives a CONNACK response from the server."""
print('Connected with result code ' + str(rc))
if rc ==0:
print("Connected to Broker")
client.subscribe('insideroom/humidity')
client.subscribe('insideroom/temperature')
def on_message(client, userdata, msg):
"""The callback for when a PUBLISH message is received from the server."""
print(msg.topic + ' ' + str(msg.payload))
sensor_data = _parse_mqtt_message(msg.topic, msg.payload.decode('utf-8'))
print(sensor_data)
if sensor_data is not None:
_send_sensor_data_to_influxdb(sensor_data)
def _parse_mqtt_message(topic, payload):
match = re.match(MQTT_REGEX, topic)
print(match)
if match:
location = match.group(1)
measurement = match.group(2)
if measurement == 'status':
return None
return SensorData(location, float(payload))
else:
return None
def _send_sensor_data_to_influxdb(sensor_data):
json_body = [
{
'measurement': sensor_data.measurement,
'tags': {
'location': sensor_data.location
},
'fields': {
'value': sensor_data.value
}
}
]
influxdb_client.write_points(json_body)
def _init_influxdb_database():
databases = influxdb_client.get_list_database()
if len(list(filter(lambda x: x['name'] == INFLUXDB_DATABASE, databases))) == 0:
influxdb_client.create_database(INFLUXDB_DATABASE)
influxdb_client.switch_database(INFLUXDB_DATABASE)
def main():
_init_influxdb_database()
mqtt_client = mqtt.Client()
mqtt_client.username_pw_set(MQTT_USER, MQTT_PASSWORD)
mqtt_client.on_connect = on_connect
mqtt_client.on_message = on_message
mqtt_client.connect(MQTT_ADDRESS, 1883)
mqtt_client.loop_forever()
if __name__ == '__main__':
print('MQTT to InfluxDB bridge')
main()
我需要通过函数_parse_mqtt_message传递消息还是可以直接写入influx?如果您有任何消息来源可以为我指明正确的方向,那也将不胜感激!