环境 - AWS Kinesis Analytics(但我可以采用建议的任何流式 SQL 概念并将其应用到我的环境中)
我有这样的源数据不断流入:
epoc_timestamp baic_id station floor ap
1552314547 alex bloor platform be-ye12
1552314548 alex bloor platform be-ye12
1552314549 alex bloor platform ye-ab70
1552314550 alex bloor platform ye-ab70
1552314551 alex bloor platform ye-ab70
1552314555 alex bloor platform ye-ab70
1552314559 alex bloor platform ge-ye30
我希望获得最后一个“ap”记录,按 baic_id 分区(有多个,而不仅仅是上面所示的“alex”)并分组为 10 秒窗口。所以上面数据的结果看起来像这样:
epoc_timestamp baic_id station floor ap
1552314540 alex bloor platform ye-ab70
1552314550 alex bloor platform ge-ye30
注意:虽然这在标准 SQL 中相对容易,例如
LAST_VALUE(ap) OVER(按 baic_id 分区,按 epoc_timestamp 范围在无界前面和无界后面之间排序)
我正在使用流式 SQL,这意味着流没有“结束”,并且 Kinesis Analytics 将不允许使用 UNBOUNDED FOLLOWING。因此,如果我使用 LAST_VALUE,它只会获取当前值,因为它看不到以下行。
这个问题大约 5 年前就被问到了..让我使用新的流式 SQL 引擎来回答它:https://github.com/timeplus-io/proton
SELECT window_start, baic_id, latest(epoc_timestamp), latest(station), latest(floor), latest(ap)
FROM tumble(stream,10s) PARTITION BY baic_id GROUP BY window_start, baic_id
tumble(stream,10s)
是每10秒创建一个翻滚窗口。
PARTITION BY baic_id
是创建子流来跟踪每个key的水印
latest(col)
是获取该窗口中的最新值。这是一个特殊的聚合函数。