我目前正在努力解决 Python 中 influx_client 的基本功能。我有一组时间序列数据,我想将其添加到不同客户端上的 influxdb 中。我当前的代码看起来有点像这样:
client = InfluxDBClient(url=f"http://{ip}:{port_db}", token=token, org=org)
write_api = client.write_api(write_options=ASYNCHRONOUS)
p = Point("title_meas").field("column_data", value_data)
write_api.write(bucket=bucket, org=org, record=p)
现在我得到了每个点的特定时间戳,我想用作 InfluxDB 键/时间戳,但无论我尝试什么 - 它都会继续添加我的主机设备的系统时间(但当我正在处理历史数据时,我需要调整时间规范)。我如何实现自定义时间戳,或者是否有更简单的方法,而不是使用
Point
方法逐行添加数据......可能类似于 Pandas 数据框?
感谢您的每一个建议。
您可以通过线路协议、Point 对象、Pandas Dataframe 或 json Dictionary 进行编写。都是可行的方法。
如果您关心吞吐量,线路协议是最快的,但如果微小的速度差异并不重要,则可以使用您想要的任何协议。我强烈推荐阅读this。您要在流入数据点上修改的“标签”称为“_time”。
要将其添加到点,请执行以下操作:
p = Point("title_meas").field("column_data", value_data).time('2021-08-09T18:04:56.865943'))
或json字典协议:
p = {'measurement':'title_meas', 'time': '2021-08-09T18:04:56.865943',
'tags':{'sometag': 'sometag'},
'fields':{'column_data': value_data}
}
确保时间戳符合您的预期的最简单方法是使用 UTC/ISO 格式。
def upload_missed_data():
for entry in influxdb_miss_data_dict['Missing_Data']:
timestamp_str = entry['Timestamp']
device_name = entry['Device']
value = entry['Value']
measurement = entry['Measurement']
timestamp = int(datetime.strptime(timestamp_str, '%Y-%m-%d %H:%M:%S').timestamp())
miss_point = Point("Environmental_Data") \
.tag("Device", device_name) \
.field(measurement, value) \
.time(timestamp, WritePrecision.S)
try:
write_api.write(bucket=bucket, org=org, record=miss_point)
except Exception as e:
print(f"Failed to upload missed data to InfluxDB: {e}")