在Siddhi中相互比较长度的数据段

问题描述 投票:0回答:1

我在下面定义了输入流。 Datetime字符串类似于2010-09-01 06:59:00.000,结果是类似157,382的double,而UnixDateTime的类型很长,类似于1283324340111

define stream HStream(ID int, DateTime String, Result double, UnixDateTime long);

我想对100个事件进行长度批处理,这些事件显示result列的平均值,并且我想将这些批处理相互比较。我想对接下来的5个批次(每个都包含100个事件)进行这种滑动比较。所以我想将第一批(0-100事件)与第二批(101-200)进行比较,直到第六批(501-600)。我希望第二批进行比较直到第7批。我想通过比较实现的是,当4个或更多(来自5个)批次的批次平均结果都大于或小于1(与原始批次的平均结果相比)时,我想记录有关原始批次的信息。

我的代码在下面。这个问题我不知道确切的语法。我查看了WSO2和Siddhi的教程和文档,但无法解决问题。

@info(name = 'MovingAverageQuery')
from every e1=HStream, e2=HStream[e1.avg(Result) <= avg(Result))+, e2=HStream[e2[last].avg(Result) <= avg(Result)]
select ID, DateTime, Result, 
avg(Result), UnixDateTime
output last every 100 events
insert into OutputStream;

@sink(type='log', prefix='LOGGER')
define stream OutputStream(Nr ID, DateTime String, Result double, Avg double, UnixDateTime long);
wso2 siddhi stream-analytics
1个回答
0
投票

您必须对需求使用两个查询,一个是计算平均值(Average100Query),另一个是比较平均值(IdentifyIncreaseingTrend)。

@App:name("AverageSequence")
@App:description("Identify the average increase trend")

define stream HStream(ID int, DateTime String, Result double, UnixDateTime long);

@sink(type='log', prefix='LOGGER')
define stream OutputStream(ID int, DateTime String, avgResult double, UnixDateTime long);

@info(name = 'Average100Query')
from HStream#window.lengthBatch(100)
select ID, DateTime, avg(Result) as avgResult, UnixDateTime 
insert into AverageStream;

@info(name='IdentifyIncreaseingTrend')
from every e1=AverageStream, e2=AverageStream[e2.avgResult >= (e1.avgResult + 1)],  e3=AverageStream[e3.avgResult >= (e2.avgResult + 1)],  e4=AverageStream[e4.avgResult >= (e3.avgResult + 1)], e5=AverageStream[e5.avgResult >= (e4.avgResult + 1)]
select e1.ID, e1.DateTime, e1.avgResult, e1.UnixDateTime 
insert into OutputStream;

我注意到的一些语法问题是在计算完成之后,例如sum(result),您必须使用as关键字将该属性命名为sum(result) as totalResult。在序列中,不能使用平均值函数,因为它需要对多个事件进行处理,但是可以使用重命名的属性totalResult

© www.soinside.com 2019 - 2024. All rights reserved.