我在下面定义了输入流。 Datetime字符串类似于2010-09-01 06:59:00.000
,结果是类似157,382
的double,而UnixDateTime的类型很长,类似于1283324340111
。
define stream HStream(ID int, DateTime String, Result double, UnixDateTime long);
我想对100个事件进行长度批处理,这些事件显示result
列的平均值,并且我想将这些批处理相互比较。我想对接下来的5个批次(每个都包含100个事件)进行这种滑动比较。所以我想将第一批(0-100事件)与第二批(101-200)进行比较,直到第六批(501-600)。我希望第二批进行比较直到第7批。我想通过比较实现的是,当4个或更多(来自5个)批次的批次平均结果都大于或小于1(与原始批次的平均结果相比)时,我想记录有关原始批次的信息。
我的代码在下面。这个问题我不知道确切的语法。我查看了WSO2和Siddhi的教程和文档,但无法解决问题。
@info(name = 'MovingAverageQuery')
from every e1=HStream, e2=HStream[e1.avg(Result) <= avg(Result))+, e2=HStream[e2[last].avg(Result) <= avg(Result)]
select ID, DateTime, Result,
avg(Result), UnixDateTime
output last every 100 events
insert into OutputStream;
@sink(type='log', prefix='LOGGER')
define stream OutputStream(Nr ID, DateTime String, Result double, Avg double, UnixDateTime long);
您必须对需求使用两个查询,一个是计算平均值(Average100Query),另一个是比较平均值(IdentifyIncreaseingTrend)。
@App:name("AverageSequence")
@App:description("Identify the average increase trend")
define stream HStream(ID int, DateTime String, Result double, UnixDateTime long);
@sink(type='log', prefix='LOGGER')
define stream OutputStream(ID int, DateTime String, avgResult double, UnixDateTime long);
@info(name = 'Average100Query')
from HStream#window.lengthBatch(100)
select ID, DateTime, avg(Result) as avgResult, UnixDateTime
insert into AverageStream;
@info(name='IdentifyIncreaseingTrend')
from every e1=AverageStream, e2=AverageStream[e2.avgResult >= (e1.avgResult + 1)], e3=AverageStream[e3.avgResult >= (e2.avgResult + 1)], e4=AverageStream[e4.avgResult >= (e3.avgResult + 1)], e5=AverageStream[e5.avgResult >= (e4.avgResult + 1)]
select e1.ID, e1.DateTime, e1.avgResult, e1.UnixDateTime
insert into OutputStream;
我注意到的一些语法问题是在计算完成之后,例如sum(result),您必须使用as
关键字将该属性命名为sum(result) as totalResult
。在序列中,不能使用平均值函数,因为它需要对多个事件进行处理,但是可以使用重命名的属性totalResult
。