我试图通过Kapacitor批量查询生成基线,通过从InfluxDB 1,2,3和4周前查询相同的间隔,然后将它向前移动并连接在一起,如下所示:
var w1 = batch
|query('SELECT mean(""value"") FROM ""MyDB"".""autogen"".""MetricName""')
.offset(1w).period(period).every(1m).align().groupBy(time(1m))
|shift(1w)
var w2 = batch
|query('SELECT mean(""value"") FROM ""MyDB"".""autogen"".""MetricName""')
.offset(2w).period(period).every(1m).align().groupBy(time(1m))
|shift(2w)
var w3 = batch
|query('SELECT mean(""value"") FROM ""MyDB"".""autogen"".""MetricName""')
.offset(3w).period(period).every(1m).align().groupBy(time(1m))
|shift(3w)
var w4 = batch
|query('SELECT mean(""value"") FROM ""MyDB"".""autogen"".""MetricName""')
.offset(4w).period(period).every(1m).align().groupBy(time(1m))
|shift(4w)
var bj = w1
|join(w2, w3, w3)
.as('w1', 'w2', 'w3', 'w4')
.fill('null')
var b = bj
|eval(lambda: (""w1.mean"" + ""w2.mean"" + ""w3.mean"" + ""w4.mean"") / float(4.0))
.as('avg')
我正在使用Full Outer Join,因为几周可能会遗漏一个值,在这种情况下,我会将基线计算为3个现值的平均值。
然而,似乎lambda不支持Mean()或任何这样的数学函数。它似乎也不支持检查空值。
有没有办法像这样计算基线?
此外,一旦计算了基线,如何保持缓存以便可以根据基线检查传入的流数据?
任何帮助表示赞赏!谢谢
首先,尝试在批处理var而不是shift上使用offset。
offset将取x前一分钟,小时,天......
应该在连接过程中使用shift节点,例如:
previous
|shift(1w)
|join(current)
......
这里有一个例子:https://github.com/influxdata/kapacitor/issues/746
关于加入4个不同的流,有不同的时间,并且由于我之前的评论,我想它不会工作......也许用union而不是join节点工作,但不确定!
您总是可以有3个刻度,检查当前到过去一周,2周等等...
“因为某些星期可能会丢失一个值” - join会永远等待这个值,而不会发出其他相应的批次并导致内存泄漏。
| barrier()节点可能有助于泄漏,但您仍然会在尝试访问缺失点属性时收到eval错误。
您希望将其拆分为多个脚本,例如计算所有4个周期的一个,为每个周期添加一些标记,如
|default().tag('stream', 'w1')
然后发送给他们
|kapacitorLoopback()
第二个监听你的环回流的脚本,| window()所有到达的点都没有分组,并计算| mean(“mean”),无论它实际上有多少个周期。