我有一个定期从第三方 API 中提取的数据管道。例如,对于每日拉取,我会拉取从第一个时间戳(UTC 以 T00:00:00.000Z 结尾)到结束时间戳(UTC 以 T23:59:59.999Z 结尾)的所有内容。
数据是相互独立提取的。我的逻辑是在每天的拉动中应用基于某些业务逻辑的标识符。对于现实世界的上下文,想想一些跟踪资产的 GPS 实用程序,我想在其中应用业务逻辑来具体定义与该资产相关的“旅行”。
我希望每次后续拉动都查看该
asset_id
的先前负载数据,如果它符合业务标准(例如,先前负载的最新 timestamp_utc
在 3 分钟内出现,我想使用前一天的 trip_id
, 否则分配一个全新的 id.
这里有一些具体的例子,说明我识别事物后数据的样子。
create or replace temporary table
prior_load (asset_id int, timestamp_utc string, trip_id string, load_id string);
insert into
prior_load (asset_id, timestamp_utc, trip_id, load_id)
values
(374, '2023-02-28T23:59:23Z', '4dbf60fc-b8af-41a0-83f4-065ab98b291d','98c1f408-aafc-467e-908b-09adcd8741da'),
(374, '2023-02-28T23:59:40Z', '4dbf60fc-b8af-41a0-83f4-065ab98b291d','98c1f408-aafc-467e-908b-09adcd8741da'),
(374, '2023-02-28T23:59:46Z', '4dbf60fc-b8af-41a0-83f4-065ab98b291d','98c1f408-aafc-467e-908b-09adcd8741da'),
(374, '2023-02-28T23:59:52Z', '4dbf60fc-b8af-41a0-83f4-065ab98b291d','98c1f408-aafc-467e-908b-09adcd8741da'),
(374, '2023-02-28T23:59:57Z', '4dbf60fc-b8af-41a0-83f4-065ab98b291d','98c1f408-aafc-467e-908b-09adcd8741da');
create or replace temporary table
current_load (asset_id int, timestamp_utc string, load_id string);
insert into
current_load (asset_id, timestamp_utc, load_id)
values
-- This data comes through in current day's load
-- Note: I do not want to assign these trip_ids until I have reconciled with prior load's (above) latest data
(374, '2023-03-01T00:00:02Z', '1822180c-c2ad-4a54-b354-b4180bf6804b'),
(374, '2023-03-01T00:00:08Z', '1822180c-c2ad-4a54-b354-b4180bf6804b'),
(374, '2023-03-01T00:00:14Z', '1822180c-c2ad-4a54-b354-b4180bf6804b'),
-- This data will get its own identifier since it is > 3 minutes after the prior timestamp collected for this asset_id
(374, '2023-03-05T00:00:24Z', '1822180c-c2ad-4a54-b354-b4180bf6804b'),
(374, '2023-03-05T00:00:24Z', '1822180c-c2ad-4a54-b354-b4180bf6804b');
到目前为止,我尝试的是通过 union all 获取最新的先前数据,然后尝试使用前一天的数据进行一些逻辑处理。基本上,如果它在 3 分钟内有前一天的
trip_id
,我想给它分配前一天的行程。我正在使用基于 uuid_string()
和 asset_id
的 trip_counter
标识符,由 load_id 中的命名空间绑定。
附上我目前的逻辑尝试:
with stage_data as (
select
asset_id,
timestamp_utc,
null as trip_id,
load_id
from
current_load
union all (
with latest as (
select
asset_id,
max(timestamp_utc) as latest_timestamp
from
prior_load
group by
asset_id
)
select
a.asset_id,
a.timestamp_utc,
a.trip_id,
a.load_id
from
prior_load as a
inner join
latest as b
on b.asset_id = a.asset_id
and b.latest_timestamp = a.timestamp_utc
)
),
boundary_trips as (
select
*,
LAG(trip_id, 1) OVER (PARTITION BY asset_id ORDER BY timestamp_utc) as prior_latest_trip_id,
LAG(timestamp_utc, 1) OVER (PARTITION BY asset_id ORDER BY timestamp_utc) as prior_timestamp_utc,
LEAD(timestamp_utc, 1) OVER (PARTITION BY asset_id ORDER BY timestamp_utc) as next_timestamp_utc
from
stage_data
),
trips1 as (
select
*,
case when prior_timestamp_utc is null or abs(datediff(second, prior_timestamp_utc, timestamp_utc)) > 180 then 1 else 0 end as trip_start,
case when next_timestamp_utc is null or abs(datediff(second, next_timestamp_utc, timestamp_utc)) > 180 then 1 else 0 end as trip_end
from
boundary_trips
),
trips2 as (
select
*,
sum(
trip_start
) over(partition by asset_id order by asset_id, timestamp_utc rows between unbounded preceding and current row) as trip_counter
from
trips1
)
-- Generate UUID based off of load_id namespace
select
*,
UUID_STRING(load_id, MD5(CONCAT(asset_id, trip_counter))) as trip_id
from
trips2
order by
asset_id,
timestamp_utc;
期望的结果:
create or replace temporary table
desired (asset_id int, timestamp_utc string, trip_id string, load_id string);
insert into desired (asset_id, timestamp_utc, trip_id, load_id)
values
-- This data is present from yesterday's load
(374, '2023-02-28T23:59:23Z', '4dbf60fc-b8af-41a0-83f4-065ab98b291d','98c1f408-aafc-467e-908b-09adcd8741da'),
(374, '2023-02-28T23:59:40Z', '4dbf60fc-b8af-41a0-83f4-065ab98b291d','98c1f408-aafc-467e-908b-09adcd8741da'),
(374, '2023-02-28T23:59:46Z', '4dbf60fc-b8af-41a0-83f4-065ab98b291d','98c1f408-aafc-467e-908b-09adcd8741da'),
(374, '2023-02-28T23:59:52Z', '4dbf60fc-b8af-41a0-83f4-065ab98b291d','98c1f408-aafc-467e-908b-09adcd8741da'),
(374, '2023-02-28T23:59:57Z', '4dbf60fc-b8af-41a0-83f4-065ab98b291d','98c1f408-aafc-467e-908b-09adcd8741da'),
-- This data comes through in current day's load but consider's the prior day's load
(374, '2023-03-01T00:00:02Z', '4dbf60fc-b8af-41a0-83f4-065ab98b291d','1822180c-c2ad-4a54-b354-b4180bf6804b'),
(374, '2023-03-01T00:00:08Z', '4dbf60fc-b8af-41a0-83f4-065ab98b291d','1822180c-c2ad-4a54-b354-b4180bf6804b'),
(374, '2023-03-01T00:00:14Z', '4dbf60fc-b8af-41a0-83f4-065ab98b291d','1822180c-c2ad-4a54-b354-b4180bf6804b'),
-- This data gets its own trip identifier because it is > 3 minutes away from the prior
(374, '2023-03-01T06:00:24Z', '0a5ae22f-4748-48ae-8e29-57a58b1be64c','1822180c-c2ad-4a54-b354-b4180bf6804b'),
(374, '2023-03-01T06:00:39Z', '0a5ae22f-4748-48ae-8e29-57a58b1be64c','1822180c-c2ad-4a54-b354-b4180bf6804b')