我有一个时间序列:
ts = pl.DataFrame(
{
"timestamp": [0, 1, 2, 3, 4, 5],
"start_index": [0, None, None, 3, None, None],
"end_index": [4, None, None, 4, None, None],
}
)
对于每个时间戳索引,我想计算并发事件的数量。例如,
预期输出:
shape: (6, 4)
┌───────────┬─────────────┬───────────┬────────────────────────────┐
│ timestamp ┆ start_index ┆ end_index ┆ number_of_concurrent_event │
│ --- ┆ --- ┆ --- ┆ --- │
│ i64 ┆ i64 ┆ i64 ┆ i64 │
╞═══════════╪═════════════╪═══════════╪════════════════════════════╡
│ 0 ┆ 0 ┆ 4 ┆ 0 │
│ 1 ┆ null ┆ null ┆ 1 │
│ 2 ┆ null ┆ null ┆ 1 │
│ 3 ┆ 3 ┆ 4 ┆ 1 │
│ 4 ┆ null ┆ null ┆ 2 │
│ 5 ┆ null ┆ null ┆ 0 │
└───────────┴─────────────┴───────────┴────────────────────────────┘
start_index
到
end_index
转换为单独的行。由此,我将与原始数据进行
outer_coalesce
连接,然后按时间戳进行分组,将原始时间戳的非空计数作为并发事件。
(
ts
.filter(pl.col('start_index').is_not_null() & pl.col('end_index').is_not_null())
.select(
original_timestamp="timestamp",
timestamp=pl.int_ranges(
pl.col('start_index')+1,
pl.col('end_index')+1
)
)
.explode('timestamp')
.join(ts,
on='timestamp',
how='outer_coalesce'
)
.group_by('timestamp',maintain_order=True)
.agg(
pl.col('start_index','end_index').drop_nulls().first(),
pl.col('original_timestamp').is_not_null().sum()
)
)
shape: (6, 4)
┌───────────┬─────────────┬───────────┬────────────────────────────┐
│ timestamp ┆ start_index ┆ end_index ┆ number_of_concurrent_event │
│ --- ┆ --- ┆ --- ┆ --- │
│ i64 ┆ i64 ┆ i64 ┆ u32 │
╞═══════════╪═════════════╪═══════════╪════════════════════════════╡
│ 0 ┆ 0 ┆ 4 ┆ 0 │
│ 1 ┆ null ┆ null ┆ 1 │
│ 2 ┆ null ┆ null ┆ 1 │
│ 3 ┆ 3 ┆ 4 ┆ 1 │
│ 4 ┆ null ┆ null ┆ 2 │
│ 5 ┆ null ┆ null ┆ 0 │
└───────────┴─────────────┴───────────┴────────────────────────────┘