如何从窗口中仅获取最新的行

问题描述 投票:0回答:1

我正在使用Kinesis Analytics,我正在尝试了解如何编写我的应用程序,以便在24小时内为我提供一个滑动窗口。我所生成的数据是正确的,但它看起来每次都会重新生成它,这可能就是它应该做的事情,而我自己的无知使我无法正确看待问题?

我想做的事:

我有一些设备可以提供Kinesis Stream,这个Kinesis分析应用程序可以连接到它。

现在,当记录进来时,我想要做的是SUM在过去24小时内的值并存储它。因此,在Kinesis Analytics完成它的工作后,我将它连接到Lambda以完成一些事情。

我的问题是,当我模拟发送一些数据时,在这种情况下5条记录,一切都运行,它运行多次,而不是5.它看起来就像每次记录进入它重做窗口中的所有内容(预期)触发了发射的每一行的lambda。随着桌子的增长,这是个坏消息。我真正想要的只是来自NOW - 24 HOUR的窗口的最新值,以及"id"字段,所以我可以将“id”加入到存储在别处的记录中。

我的应用程序如下所示:

CREATE OR REPLACE STREAM "DEVICE_STREAM" (
    "id" VARCHAR(64),
    "timestamp_mark" TIMESTAMP,
    "device_id" VARCHAR(64),
    "property_a_id" VARCHAR(64),
    "property_b_id" VARCHAR(64),
    "value" DECIMAL
);

CREATE OR REPLACE PUMP "DEVICE_PUMP" AS
    INSERT INTO "DEVICE_STREAM"
    SELECT STREAM "id",
        "timestamp_mark",
        "device_id",
        "x_id",
        "y_id",
        SUM("value") OVER W1 AS "value",
    FROM "SOURCE_SQL_STREAM_001"
    WINDOW W1 AS (
        PARTITION BY "device_id", "property_a_id", "property_b_id" ORDER BY "SOURCE_SQL_STREAM_001".ROWTIME
        RANGE INTERVAL '24' HOUR PRECEDING
    );

嗯..这可能是一个更好的主意,在子选择中进行聚合并从中进行选择。看起来我需要第二个窗口(下面的W2)以确保我获得每个记录。

CREATE OR REPLACE STREAM "DEVICE_STREAM" (
    "id" VARCHAR(64),
    "timestamp_mark" TIMESTAMP,
    "device_id" VARCHAR(64),
    "property_a_id" VARCHAR(64),
    "property_b_id" VARCHAR(64),
    "value" DECIMAL
);

CREATE OR REPLACE PUMP "DEVICE_PUMP" AS
    INSERT INTO "DEVICE_STREAM"
    SELECT STREAM s."id",
        s."timestamp_mark",
        s."device_id",
        s."property_a_id",
        s."property_b_id",
        v."value"
    FROM "SOURCE_SQL_STREAM_001" OVER W2 AS s, (
        SELECT STREAM "SOURCE_SQL_STREAM_001"."ROWTIME", "id",
            "timestamp_mark",
            "device_id",
            "property_a_id",
            "property_b_id",
            SUM("value") OVER W1 AS "value",
            FROM "SOURCE_SQL_STREAM_001"
            WINDOW W1 AS (
                PARTITION BY "device_id", "property_a_id", "property_b_id" ORDER BY "SOURCE_SQL_STREAM_001".ROWTIME
                RANGE INTERVAL '24' HOUR PRECEDING
            )
        ) AS v
    WHERE s."id" = v."id"
    WINDOW W2 AS (
        RANGE INTERVAL '1' SECOND PRECEDING
    );

另外我注意到,如果我重新启动Kinesis Analytics应用程序,SUM值会重置,因此很明显它不会在重新启动时持续存在,这可能使其不适合此解决方案。我可能只需要设置SQL服务器并定期删除旧记录。

amazon-web-services amazon-kinesis
1个回答
0
投票

通常,当您需要根据事件中的数据执行某些操作时,建议使用Streaming Analytics解决方案(特别是Kinesis Analytics),而不是像挂钟时间那样的外部事务。

原因很简单:如果你需要每24小时做一次,你就创建一个工作,从存储(DB)中获取数据一次,执行任务然后再“休眠”24小时 - 没有复杂性,可管理的开销。现在,如果您需要根据数据执行某些操作(例如,当多个事件中某些字段的SUM超过X时),您就会遇到传统解决方案的问题,因为没有简单的标准可以运行它。如果定期运行它,可能会多次调用它,直到满足数据驱动条件,从而产生明显的开销。

在最新的情况下,Streaming Analytics解决方案将按设计使用,并在需要时触发您的逻辑,从而最大限度地减少开销。

如果您更喜欢使用Streaming Analytics(我个人不建议根据您的问题描述),但是在使用Kinesis Analytics语法时,您可能会考虑使用Drools Kinesis Analytics。它的功能包括cronscollectors,它们为您提供了非常简单的按时触发作业的方法。

  • 请注意,因为我是Streamx的首席技术官,所以我的回答是有偏见的。
© www.soinside.com 2019 - 2024. All rights reserved.