我的有效负载如下。我需要每1分钟获取第一个不同的批次值。请让我知道如何使用isfirst和lag或last来在流分析中实现此目标
输出类似:
BATCH = 01,“ 2015-01-01T00:00:01.0000000Z”BATCH = 02,“ 2015-01-01T00:00:03.0000000Z”BATCH = 03,“ 2015-01-01T00:00:06.0000000Z”BATCH = 01,“ 2015-01-01T00:00:14.0000000Z”BATCH = 02,“ 2015-01-01T00:00:18.0000000Z”BATCH = 03,“ 2015-01-01T00:00:22.0000000Z”BATCH = 01,“ 2015-01-01T00:00:27.0000000Z”BATCH = 01,“ 2015-01-01T00:00:31.0000000Z”
Pay Load:
[{
"Payload": {
"Make": "BATCH1",
"VAL": "01",
"TS": "2015-01-01T00:00:01.0000000Z"
}
},
{
"Payload": {
"Make": "BATCH1",
"VAL": "01",
"TS": "2015-01-01T00:00:02.0000000Z"
}
},
{
"Payload": {
"Make": "BATCH1",
"VAL": "02",
"TS": "2015-01-01T00:00:03.0000000Z"
}
},
{
"Payload": {
"Make": "BATCH1",
"VAL": "02",
"TS": "2015-01-01T00:00:04.0000000Z"
}
},
{
"Payload": {
"Make": "BATCH1",
"VAL": "02",
"TS": "2015-01-01T00:00:05.0000000Z"
}
},
{"Payload": {
"Make": "BATCH1",
"VAL": "03",
"TS": "2015-01-01T00:00:06.0000000Z"
}
},
{"Payload": {
"Make": "BATCH1",
"VAL": "03",
"TS": "2015-01-01T00:00:07.0000000Z"
}
},
{"Payload": {
"Make": "BATCH1",
"VAL": "03",
"TS": "2015-01-01T00:00:10.0000000Z"
}
},
{"Payload": {
"Make": "BATCH1",
"VAL": "03",
"TS": "2015-01-01T00:00:11.0000000Z"
}
},
{"Payload": {
"Make": "BATCH1",
"VAL": "03",
"TS": "2015-01-01T00:00:12.0000000Z"
}
},
{"Payload": {
"Make": "BATCH2",
"VAL": "01",
"TS": "2015-01-01T00:00:13.0000000Z"
}
},
{"Payload": {
"Make": "BATCH2",
"VAL": "01",
"TS": "2015-01-01T00:00:14.0000000Z"
}
},
{"Payload": {
"Make": "BATCH2",
"VAL": "01",
"TS": "2015-01-01T00:00:15.0000000Z"
}
},
{"Payload": {
"Make": "BATCH2",
"VAL": "01",
"TS": "2015-01-01T00:00:16.0000000Z"
}
},
{"Payload": {
"Make": "BATCH2",
"VAL": "01",
"TS": "2015-01-01T00:00:17.0000000Z"
}
},
{"Payload": {
"Make": "BATCH2",
"VAL": "02",
"TS": "2015-01-01T00:00:18.0000000Z"
}
},
{"Payload": {
"Make": "BATCH2",
"VAL": "02",
"TS": "2015-01-01T00:00:20.0000000Z"
}
},
{"Payload": {
"Make": "BATCH2",
"VAL": "02",
"TS": "2015-01-01T00:00:21.0000000Z"
}
},
{"Payload": {
"Make": "BATCH3",
"VAL": "02",
"TS": "2015-01-01T00:00:22.0000000Z"
}
},
{"Payload": {
"Make": "BATCH3",
"VAL": "02",
"TS": "2015-01-01T00:00:23.0000000Z"
}
},
{"Payload": {
"Make": "BATCH3",
"VAL": "02",
"TS": "2015-01-01T00:00:24.0000000Z"
}
},
{"Payload": {
"Make": "BATCH3",
"VAL": "02",
"TS": "2015-01-01T00:00:25.0000000Z"
}
},
{"Payload": {
"Make": "BATCH3",
"VAL": "02",
"TS": "2015-01-01T00:00:26.0000000Z"
}
},
{"Payload": {
"Make": "BATCH4",
"VAL": "01",
"TS": "2015-01-01T00:00:27.0000000Z"
}
},
{"Payload": {
"Make": "BATCH4",
"VAL": "01",
"TS": "2015-01-01T00:00:28.0000000Z"
}
},
{"Payload": {
"Make": "BATCH4",
"VAL": "01",
"TS": "2015-01-01T00:00:29.0000000Z"
}
},
{"Payload": {
"Make": "BATCH4",
"VAL": "01",
"TS": "2015-01-01T00:00:30.0000000Z"
}
},
{"Payload": {
"Make": "BATCH5",
"VAL": "01",
"TS": "2015-01-01T00:00:31.0000000Z"
}
}
]
我试图总结您的要求如下:
示例输入,在一个分钟的窗口中,每个批次ID可能有多个VAL更改:
Make:batch1,Val:01,Make:batch1,val:01,Make:batch1,val:02,编号:batch1,val:02××××××××××××××编号:batch2,val:01,Make:batch2,val:01,Xxxxxxxxxx
所需的输出,每批仅val更改,没有重复:
[Make:batch1,val:01 Make:batch1,val:02 Make:batch2,val:01
答案分为两部分:
1。静态收集数据,您可以使用内置的Tumbling Window function,如下所示:
2。没有内置的ASA函数(例如distinct)来过滤重复项。我建议您使用GROUP BY
,MAX
,ASA UDF
(link)接近结果。
SQL:
SELECT g.Payload.Make,g.Payload.VAL,max(udf.convertdate(g.Payload.TS)) as TS
FROM geoinput g TIMESTAMP BY g.Payload.TS
GROUP BY g.Payload.Make,g.Payload.VAL, TumblingWindow(Duration(minute, 1))
测试输出:
顺便说一句,我只在UDF中使用下面的代码
var date = new Date(datetime);
return date.getTime();
另一种解决方法,您可以在1分钟内收集所有数据,然后使用Azure Function as Output.。在Azure功能中,您可以根据需要处理数据。例如使用JSON对象存储数据。键值结构可以过滤重复的行。