我正在使用 Polars (Python) 处理数据集,并且对“滚动”分组操作感到困惑。数据如下:
更新于 | last_trade_ts | 询问价格 | 询问数量 | 出价 | 出价数量 |
---|---|---|---|---|---|
2023-12-20 15:30:54 欧洲中部时间 | 2023-12-20 15:30:42 欧洲中部时间 | 114.2 | 1.2 | 109.49 | 0.1 |
2023-12-20 15:31:38 欧洲中部时间 | 2023-12-20 15:30:42 欧洲中部时间 | 112.0 | 15.2 | 109.49 | 0.1 |
2023-12-20 15:31:44 欧洲中部时间 | 2023-12-20 15:30:42 欧洲中部时间 | 112.0 | 13.2 | 109.49 | 0.1 |
2023-12-20 15:31:58 欧洲中部时间 | 2023-12-20 15:31:56 欧洲中部时间 | 112.0 | 1.2 | 109.49 | 0.1 |
2023-12-20 15:33:14 欧洲中部时间 | 2023-12-20 15:31:56 欧洲中部时间 | 112.0 | 1.2 | 109.49 | 0.1 |
2023-12-20 15:33:27 欧洲中部时间 | 2023-12-20 15:31:56 欧洲中部时间 | 112.0 | 1.2 | 109.49 | 0.1 |
2023-12-20 15:33:36 欧洲中部时间 | 2023-12-20 15:31:56 欧洲中部时间 | 112.0 | 1.2 | 109.49 | 0.1 |
2023-12-20 15:33:47 欧洲中部时间 | 2023-12-20 15:31:56 欧洲中部时间 | 112.0 | 1.2 | 109.49 | 0.1 |
我想要做的是在 5 分钟窗口中聚合数据,其开始时间由
updated_at
列设置,但前提是 last_trade_ts
列中有新值。
例如,在上面的片段中,我想要
updated_at BETWEEN '2023-12-20 15:30:54' AND '2023-12-20 15:35:54'
第一组,并且
updated_at BETWEEN '2023-12-20 15:31:58' AND '2023-12-20 15:36:58'
第二个。
当然,一个可能的解决方案是对
.rolling()
应用 updated_at
方法,然后从后续按 last_trade_ts
分组中仅选择第一行。然而,数据集相当大,这种方法涉及大量不必要的计算。
有没有办法执行类似于
.rolling()
的操作,但使用起始索引的子集?
附注如果需要的话,我愿意接受使用其他工具的解决方案。如果有人在 R 中有解决方案,我也可以考虑迁移到它。
.group_by_dynamic()
的解决方案,因为您只能对 every
和 period
参数使用单列。
另一种方法是更经典的类似 sql 的方法。因此,首先我们创建一个单独的 DataFrame,它为我们提供所需组的开始时刻列表。 为此,我们将使用
rle_id()
,每次 last_trade_ts
更改时索引列都会增加 1:
(
df
.with_columns(pl.col("last_trade_ts").rle_id().alias('i'))
)
┌─────────────────────┬─────────────────────┬───────────┬─────┐
│ updated_at ┆ last_trade_ts ┆ ask_price ┆ i │
│ --- ┆ --- ┆ --- ┆ --- │
│ datetime[μs] ┆ datetime[μs] ┆ f64 ┆ u32 │
╞═════════════════════╪═════════════════════╪═══════════╪═════╡
│ 2023-12-20 15:30:54 ┆ 2023-12-20 15:30:42 ┆ 114.2 ┆ 0 │
│ 2023-12-20 15:31:38 ┆ 2023-12-20 15:30:42 ┆ 112.0 ┆ 0 │
│ 2023-12-20 15:31:44 ┆ 2023-12-20 15:30:42 ┆ 112.0 ┆ 0 │
│ 2023-12-20 15:31:58 ┆ 2023-12-20 15:31:56 ┆ 112.0 ┆ 1 │
│ 2023-12-20 15:33:14 ┆ 2023-12-20 15:31:56 ┆ 112.0 ┆ 1 │
│ 2023-12-20 15:33:27 ┆ 2023-12-20 15:31:56 ┆ 112.0 ┆ 1 │
│ 2023-12-20 15:33:36 ┆ 2023-12-20 15:31:56 ┆ 112.0 ┆ 1 │
│ 2023-12-20 15:33:47 ┆ 2023-12-20 15:31:56 ┆ 112.0 ┆ 1 │
└─────────────────────┴─────────────────────┴───────────┴─────┘
现在,我们不需要此列,我们只想将其用作分组索引:
df_groups = (
df
.group_by(pl.col("last_trade_ts").rle_id())
.agg(pl.col("updated_at").first())
)
┌───────────────┬─────────────────────┐
│ last_trade_ts ┆ updated_at │
│ --- ┆ --- │
│ u32 ┆ datetime[μs] │
╞═══════════════╪═════════════════════╡
│ 1 ┆ 2023-12-20 15:31:58 │
│ 0 ┆ 2023-12-20 15:30:54 │
└───────────────┴─────────────────────┘
现在我们要做的就是在
df_groups
的条件下将 df
加入到 df.updated_at between df_groups.updated_at and df_groups.updated_at + 5 minutes
。
不幸的是,极坐标不太适合不等式连接,因此您可以使用
cross
连接,然后使用 .filter()
:
(
df_groups
.join(df, how="cross")
.filter(
pl.col("updated_at_right") >= pl.col("updated_at"),
pl.col("updated_at_right") <= pl.col("updated_at") + pl.duration(minutes=5)
)
.group_by("updated_at")
.agg(pl.col("updated_at_right").max())
)
┌─────────────────────┬─────────────────────┐
│ updated_at ┆ updated_at_right │
│ --- ┆ --- │
│ datetime[μs] ┆ datetime[μs] │
╞═════════════════════╪═════════════════════╡
│ 2023-12-20 15:31:58 ┆ 2023-12-20 15:33:47 │
│ 2023-12-20 15:30:54 ┆ 2023-12-20 15:33:47 │
└─────────────────────┴─────────────────────┘
您还可以使用
duckdb
并使用sql来获取结果:
import duckdb
duckdb.sql("""
select
df2.updated_at as start,
max(df.updated_at) as end
from df2
inner join df on
df.updated_at >= df2.updated_at and
df.updated_at <= df2.updated_at + interval 5 minutes
group by
df2.updated_at
""").pl()
┌─────────────────────┬─────────────────────┐
│ start ┆ end │
│ --- ┆ --- │
│ datetime[μs] ┆ datetime[μs] │
╞═════════════════════╪═════════════════════╡
│ 2023-12-20 15:31:58 ┆ 2023-12-20 15:33:47 │
│ 2023-12-20 15:30:54 ┆ 2023-12-20 15:33:47 │
└─────────────────────┴─────────────────────┘