Kapacitor Lambda意思

问题描述 投票:0回答:2

我试图通过Kapacitor批量查询生成基线,通过从InfluxDB 1,2,3和4周前查询相同的间隔,然后将它向前移动并连接在一起,如下所示:

var w1 = batch
    |query('SELECT mean(""value"") FROM ""MyDB"".""autogen"".""MetricName""')
        .offset(1w).period(period).every(1m).align().groupBy(time(1m))
    |shift(1w)

var w2 = batch
    |query('SELECT mean(""value"") FROM ""MyDB"".""autogen"".""MetricName""')
        .offset(2w).period(period).every(1m).align().groupBy(time(1m))
    |shift(2w)

var w3 = batch
    |query('SELECT mean(""value"") FROM ""MyDB"".""autogen"".""MetricName""')
        .offset(3w).period(period).every(1m).align().groupBy(time(1m))
    |shift(3w)

var w4 = batch
    |query('SELECT mean(""value"") FROM ""MyDB"".""autogen"".""MetricName""')
        .offset(4w).period(period).every(1m).align().groupBy(time(1m))
    |shift(4w)

var bj = w1
    |join(w2, w3, w3)
        .as('w1', 'w2', 'w3', 'w4')
        .fill('null')

var b = bj
    |eval(lambda: (""w1.mean"" + ""w2.mean"" + ""w3.mean"" + ""w4.mean"") / float(4.0))
        .as('avg')

我正在使用Full Outer Join,因为几周可能会遗漏一个值,在这种情况下,我会将基线计算为3个现值的平均值。

然而,似乎lambda不支持Mean()或任何这样的数学函数。它似乎也不支持检查空值。

有没有办法像这样计算基线?

此外,一旦计算了基线,如何保持缓存以便可以根据基线检查传入的流数据?

任何帮助表示赞赏!谢谢

influxdb kapacitor
2个回答
0
投票

首先,尝试在批处理var而不是shift上使用offset。

offset将取x前一分钟,小时,天......

应该在连接过程中使用shift节点,例如:

previous
    |shift(1w)
    |join(current)
    ......

这里有一个例子:https://github.com/influxdata/kapacitor/issues/746

关于加入4个不同的流,有不同的时间,并且由于我之前的评论,我想它不会工作......也许用union而不是join节点工作,但不确定!

您总是可以有3个刻度,检查当前到过去一周,2周等等...


0
投票

“因为某些星期可能会丢失一个值” - join会永远等待这个值,而不会发出其他相应的批次并导致内存泄漏。

| barrier()节点可能有助于泄漏,但您仍然会在尝试访问缺失点属性时收到eval错误。

您希望将其拆分为多个脚本,例如计算所有4个周期的一个,为每个周期添加一些标记,如

|default().tag('stream', 'w1')

然后发送给他们

|kapacitorLoopback()

第二个监听你的环回流的脚本,| window()所有到达的点都没有分组,并计算| mean(“mean”),无论它实际上有多少个周期。

© www.soinside.com 2019 - 2024. All rights reserved.