Consolidate Prior Identifier from previous Data Load based on Criteria

问题描述 投票:0回答:0

我有一个定期从第三方 API 中提取的数据管道。例如,对于每日拉取,我会拉取从第一个时间戳(UTC 以 T00:00:00.000Z 结尾)到结束时间戳(UTC 以 T23:59:59.999Z 结尾)的所有内容。

数据是相互独立提取的。我的逻辑是在每天的拉动中应用基于某些业务逻辑的标识符。对于现实世界的上下文,想想一些跟踪资产的 GPS 实用程序,我想在其中应用业务逻辑来具体定义与该资产相关的“旅行”。

我希望每次后续拉动都查看该

asset_id
的先前负载数据,如果它符合业务标准(例如,先前负载的最新
timestamp_utc
在 3 分钟内出现,我想使用前一天的
trip_id
, 否则分配一个全新的 id.

这里有一些具体的例子,说明我识别事物后数据的样子。

create or replace temporary table 
    prior_load (asset_id int, timestamp_utc string, trip_id string, load_id string);
insert into 
    prior_load (asset_id, timestamp_utc, trip_id, load_id)
values 
    (374, '2023-02-28T23:59:23Z', '4dbf60fc-b8af-41a0-83f4-065ab98b291d','98c1f408-aafc-467e-908b-09adcd8741da'),
    (374, '2023-02-28T23:59:40Z', '4dbf60fc-b8af-41a0-83f4-065ab98b291d','98c1f408-aafc-467e-908b-09adcd8741da'),
    (374, '2023-02-28T23:59:46Z', '4dbf60fc-b8af-41a0-83f4-065ab98b291d','98c1f408-aafc-467e-908b-09adcd8741da'),
    (374, '2023-02-28T23:59:52Z', '4dbf60fc-b8af-41a0-83f4-065ab98b291d','98c1f408-aafc-467e-908b-09adcd8741da'),
    (374, '2023-02-28T23:59:57Z', '4dbf60fc-b8af-41a0-83f4-065ab98b291d','98c1f408-aafc-467e-908b-09adcd8741da');

create or replace temporary table
    current_load (asset_id int, timestamp_utc string, load_id string);
insert into
    current_load (asset_id, timestamp_utc, load_id)
values
    -- This data comes through in current day's load
    -- Note: I do not want to assign these trip_ids until I have reconciled with prior load's (above) latest data
    (374, '2023-03-01T00:00:02Z', '1822180c-c2ad-4a54-b354-b4180bf6804b'),
    (374, '2023-03-01T00:00:08Z', '1822180c-c2ad-4a54-b354-b4180bf6804b'),
    (374, '2023-03-01T00:00:14Z', '1822180c-c2ad-4a54-b354-b4180bf6804b'),
    -- This data will get its own identifier since it is > 3 minutes after the prior timestamp collected for this asset_id
    (374, '2023-03-05T00:00:24Z', '1822180c-c2ad-4a54-b354-b4180bf6804b'),
    (374, '2023-03-05T00:00:24Z', '1822180c-c2ad-4a54-b354-b4180bf6804b');

到目前为止,我尝试的是通过 union all 获取最新的先前数据,然后尝试使用前一天的数据进行一些逻辑处理。基本上,如果它在 3 分钟内有前一天的

trip_id
,我想给它分配前一天的行程。我正在使用基于
uuid_string()
asset_id
trip_counter
标识符,由 load_id 中的命名空间绑定。

附上我目前的逻辑尝试:

with stage_data as (
    select
        asset_id,
        timestamp_utc,
        null as trip_id,
        load_id
    from
        current_load
    union all (
        with latest as (
            select
                asset_id,
                max(timestamp_utc) as latest_timestamp
            from
                prior_load
            group by
                asset_id
        )
        select
            a.asset_id,
            a.timestamp_utc,
            a.trip_id,
            a.load_id
        from
            prior_load as a
        inner join
            latest as b
                on b.asset_id = a.asset_id
                and b.latest_timestamp = a.timestamp_utc
    )
),
boundary_trips as (
    select
        *,
        LAG(trip_id, 1) OVER (PARTITION BY asset_id ORDER BY timestamp_utc) as prior_latest_trip_id,
        LAG(timestamp_utc, 1) OVER (PARTITION BY asset_id ORDER BY timestamp_utc) as prior_timestamp_utc,
        LEAD(timestamp_utc, 1) OVER (PARTITION BY asset_id ORDER BY timestamp_utc) as next_timestamp_utc
    from
        stage_data
),
trips1 as (
    select
        *,
        case when  prior_timestamp_utc is null or abs(datediff(second, prior_timestamp_utc, timestamp_utc)) > 180 then 1 else 0 end as trip_start,
        case when next_timestamp_utc is null or abs(datediff(second, next_timestamp_utc, timestamp_utc)) > 180 then 1 else 0 end as trip_end
    from
        boundary_trips
),
trips2 as (
    select
        *,
        sum(
            trip_start
        ) over(partition by asset_id order by asset_id, timestamp_utc rows between unbounded preceding and current row) as trip_counter
    from
        trips1
)
-- Generate UUID based off of load_id namespace
select
    *,
    UUID_STRING(load_id, MD5(CONCAT(asset_id, trip_counter))) as trip_id
from
    trips2
order by
    asset_id, 
    timestamp_utc;

期望的结果:

create or replace temporary table 
    desired (asset_id int, timestamp_utc string, trip_id string, load_id string);
insert into desired (asset_id, timestamp_utc, trip_id, load_id)
    values 
        -- This data is present from yesterday's load
        (374, '2023-02-28T23:59:23Z', '4dbf60fc-b8af-41a0-83f4-065ab98b291d','98c1f408-aafc-467e-908b-09adcd8741da'),
        (374, '2023-02-28T23:59:40Z', '4dbf60fc-b8af-41a0-83f4-065ab98b291d','98c1f408-aafc-467e-908b-09adcd8741da'),
        (374, '2023-02-28T23:59:46Z', '4dbf60fc-b8af-41a0-83f4-065ab98b291d','98c1f408-aafc-467e-908b-09adcd8741da'),
        (374, '2023-02-28T23:59:52Z', '4dbf60fc-b8af-41a0-83f4-065ab98b291d','98c1f408-aafc-467e-908b-09adcd8741da'),
        (374, '2023-02-28T23:59:57Z', '4dbf60fc-b8af-41a0-83f4-065ab98b291d','98c1f408-aafc-467e-908b-09adcd8741da'),
        -- This data comes through in current day's load but consider's the prior day's load
        (374, '2023-03-01T00:00:02Z', '4dbf60fc-b8af-41a0-83f4-065ab98b291d','1822180c-c2ad-4a54-b354-b4180bf6804b'),
        (374, '2023-03-01T00:00:08Z', '4dbf60fc-b8af-41a0-83f4-065ab98b291d','1822180c-c2ad-4a54-b354-b4180bf6804b'),
        (374, '2023-03-01T00:00:14Z', '4dbf60fc-b8af-41a0-83f4-065ab98b291d','1822180c-c2ad-4a54-b354-b4180bf6804b'),
        -- This data gets its own trip identifier because it is > 3 minutes away from the prior
        (374, '2023-03-01T06:00:24Z', '0a5ae22f-4748-48ae-8e29-57a58b1be64c','1822180c-c2ad-4a54-b354-b4180bf6804b'),
        (374, '2023-03-01T06:00:39Z', '0a5ae22f-4748-48ae-8e29-57a58b1be64c','1822180c-c2ad-4a54-b354-b4180bf6804b')
sql snowflake-cloud-data-platform partitioning
© www.soinside.com 2019 - 2024. All rights reserved.