使用Python将自定义时间戳写入InfluxDB

问题描述 投票:0回答:2

我目前正在努力解决 Python 中 influx_client 的基本功能。我有一组时间序列数据,我想将其添加到不同客户端上的 influxdb 中。我当前的代码看起来有点像这样:

client = InfluxDBClient(url=f"http://{ip}:{port_db}", token=token, org=org)    
write_api = client.write_api(write_options=ASYNCHRONOUS)

p = Point("title_meas").field("column_data", value_data)
write_api.write(bucket=bucket, org=org, record=p)

现在我得到了每个点的特定时间戳,我想用作 InfluxDB 键/时间戳,但无论我尝试什么 - 它都会继续添加我的主机设备的系统时间(但当我正在处理历史数据时,我需要调整时间规范)。我如何实现自定义时间戳,或者是否有更简单的方法,而不是使用

Point
方法逐行添加数据......可能类似于 Pandas 数据框? 感谢您的每一个建议。

python influxdb
2个回答
1
投票

您可以通过线路协议、Point 对象、Pandas Dataframe 或 json Dictionary 进行编写。都是可行的方法。

如果您关心吞吐量,线路协议是最快的,但如果微小的速度差异并不重要,则可以使用您想要的任何协议。我强烈推荐阅读this。您要在流入数据点上修改的“标签”称为“_time”。

要将其添加到点,请执行以下操作:

p = Point("title_meas").field("column_data", value_data).time('2021-08-09T18:04:56.865943'))

或json字典协议:

p = {'measurement':'title_meas', 'time': '2021-08-09T18:04:56.865943',
 'tags':{'sometag': 'sometag'},
 'fields':{'column_data': value_data}
}

确保时间戳符合您的预期的最简单方法是使用 UTC/ISO 格式


0
投票

def upload_missed_data():

for entry in influxdb_miss_data_dict['Missing_Data']:
    timestamp_str = entry['Timestamp']
    device_name = entry['Device']
    value = entry['Value']
    measurement = entry['Measurement']
    
    timestamp = int(datetime.strptime(timestamp_str, '%Y-%m-%d %H:%M:%S').timestamp())
    
    miss_point = Point("Environmental_Data") \
        .tag("Device", device_name) \
        .field(measurement, value) \
        .time(timestamp, WritePrecision.S)
    
    try:
        write_api.write(bucket=bucket, org=org, record=miss_point)
    except Exception as e:
        print(f"Failed to upload missed data to InfluxDB: {e}")
© www.soinside.com 2019 - 2024. All rights reserved.