通过 Python influxdb_client 在 Flux 查询中使用参数

问题描述 投票:0回答:4

我正在尝试更新所有 influxdb python 查询,以便它们不易受到 sql 注入的攻击。

为此,我读到您可以将参数与

query_api()
一起使用,特别是与
query_data_frame()
一起使用(https://medium.com/sekoia-io-blog/avoiding-injections-with-influxdb-bind -参数-50f67e379abb

我遇到的问题是我不知道如何将我的参数传递到我的查询中。以下是我们的一个查询示例:

client = InfluxDBClient(url="localhost:5000", token="", timeout=100000, retries=0, enable_gzip=True, profilers="query, operator")
query_api = client.query_api()

ver = "data" # This variable would actually come from a function
params = {
    "ver": ver,
}
query =                      '''from(bucket: "db")
                                |> range(start: -200d)
                                |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")
                                |> filter(fn: (r) => r._measurement == "test_result")
                                |> filter(fn: (r) => r.version == ver)
                                |> keep(columns: ["_time", "test", "run", "status_tag", "duration_sec", "version"])'''

df = query_api.query_data_frame(query=query, params=params)

运行上面的代码给我一个

HTTP response body: b'{"error":"type error 5:75-5:78: undefined identifier \\"ver\\""}\n'
错误。

有谁知道如何使用 Python 将参数正确注入到 Flux 查询中?

我还使用了以下方法来寻求帮助: https://influxdb-client.readthedocs.io/_/downloads/en/stable/pdf/

根据用户16442705问题更新

我在字典中尝试了另一个变量名,它产生了相同的结果。我还尝试在查询中使用 $ ,这产生了不同的错误。请参阅以下有错误的代码:

client = InfluxDBClient(url="localhost:5000", token="", timeout=100000, retries=0, enable_gzip=True, profilers="query, operator")
query_api = client.query_api()

ver = "data" # This variable would actually come from a function
params = {
    "pVersion": ver,
}
query =                      '''from(bucket: "db")
                                |> range(start: -200d)
                                |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")
                                |> filter(fn: (r) => r._measurement == "test_result")
                                |> filter(fn: (r) => r.version == pVersion)
                                |> keep(columns: ["_time", "test", "run", "status_tag", "duration_sec", "version"])'''

df = query_api.query_data_frame(query=query, params=params)

HTTP response body: b'{"error":"type error 5:67-5:80: undefined identifier \\"pVersion\\""}\n'

client = InfluxDBClient(url="localhost:5000", token="", timeout=100000, retries=0, enable_gzip=True, profilers="query, operator")
query_api = client.query_api()

ver = "data" # This variable would actually come from a function
params = {
    "pVersion": ver,
}
query =                      '''from(bucket: "db")
                                |> range(start: -200d)
                                |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")
                                |> filter(fn: (r) => r._measurement == "test_result")
                                |> filter(fn: (r) => r.version == $pVersion)
                                |> keep(columns: ["_time", "test", "run", "status_tag", "duration_sec", "version"])'''

df = query_api.query_data_frame(query=query, params=params)

HTTP response body: b'{"error":"loc 0:0-0:0: expected an operator between two expressions"}\n'

另一个需要注意的数据点是我们正在使用以下版本:

  • Influxdb-版本:1.8.6
  • influxdb-客户端:1.19.0
python sql-injection influxdb influxdb-python flux-influxdb
4个回答
2
投票

参数化查询似乎仅受 Influx v2 Cloud 支持。我已经在 Golang 中解决了一些类似的问题,发现我本地的 Influx 实例不支持它。有点令人沮丧的是,这只是一个云功能。

Influx 关于参数化查询的博客文章第三句话第一段指出它是 Influx 云功能

Influx 云文档Influx V2 OSS 文档 请注意,页面底部没有参数化查询的文档


1
投票

问题实际上出在我使用的 influxdb 版本(1.8.6)上。查询参数不是 Influxdb 1.8.6 的功能,仅在 Influxdb 2.0.x 中引入

请参阅下面的链接,了解 Influxdb-python-client 团队提出的问题。 https://github.com/influxdata/influxdb-client-python/issues/285


0
投票

尝试在查询字符串中使用

$ver
,而不是
ver

client = InfluxDBClient(url="localhost:5000", token="", timeout=100000, retries=0, enable_gzip=True, profilers="query, operator")
query_api = client.query_api()

ver = "data" # This variable would actually come from a function
params = {
    "ver": ver,
}
query = '''from(bucket: "db")
           |> range(start: -200d)
           |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")
           |> filter(fn: (r) => r._measurement == "test_result")
           |> filter(fn: (r) => r.version == $ver)  # <--------------------------------- 
           |> keep(columns: ["_time", "test", "run", "status_tag", "duration_sec", "version"])'''

df = query_api.query_data_frame(query=query, params=params)

免责声明


0
投票

我建议您按如下方式构建查询:

mydelay = -5
myquery = ('from(bucket: "mybucket")
            |> range(start: {0}s)
            |> filter(fn: (r) => r["_field"] == "myfield")
            |> filt`enter code here`er(fn: (r) => r["lieu"] == "mylieu")'.format(mydelay))

所以,一切都变得可能。

© www.soinside.com 2019 - 2024. All rights reserved.