我正在将管道从
pandas
迁移到polars
,数据是停靠在仓库中的卡车的到达和出发,在管道的某个步骤中我需要计算任意停靠的卡车数量给定时间,也就是说,对于每一行(停靠的卡车),我需要计算时间窗口中存在的唯一卡车(“ID”)的数量(- 1 分钟到达时间和 + 1 分钟出发时间) 。我还没有找到一种有效的方法,并且不依赖于逐行应用函数(pandas
的风格)。
data = pl.from_repr("""
┌─────────────────────┬─────────────────────┬─────┐
│ arrival_time ┆ departure_time ┆ ID │
│ --- ┆ --- ┆ --- │
│ datetime[μs] ┆ datetime[μs] ┆ str │
╞═════════════════════╪═════════════════════╪═════╡
│ 2023-01-01 06:23:47 ┆ 2023-01-01 06:25:08 ┆ A1 │
│ 2023-01-01 06:26:42 ┆ 2023-01-01 06:28:02 ┆ A1 │
│ 2023-01-01 06:30:20 ┆ 2023-01-01 06:35:01 ┆ A5 │
│ 2023-01-01 06:32:06 ┆ 2023-01-01 06:33:48 ┆ A6 │
│ 2023-01-01 06:33:09 ┆ 2023-01-01 06:36:01 ┆ B3 │
│ 2023-01-01 06:34:08 ┆ 2023-01-01 06:39:49 ┆ C3 │
│ 2023-01-01 06:36:40 ┆ 2023-01-01 06:38:34 ┆ A6 │
│ 2023-01-01 06:37:43 ┆ 2023-01-01 06:40:48 ┆ A5 │
│ 2023-01-01 06:39:48 ┆ 2023-01-01 06:46:10 ┆ A6 │
└─────────────────────┴─────────────────────┴─────┘
""")
我到目前为止的代码,第一部分在
polars
,最后一部分仍然使用pandas
:
processed_data = (
data.sort(by=pl.col("arrival_time"))
.with_columns(
arrival_time_expanded=pl.col("arrival_time").dt.offset_by("-1m"),
departure_time_expanded=pl.col("departure_time").dt.offset_by("1m"),
)
.to_pandas()
)
processed_data = processed_data.assign(
docked_trucks=processed_data.apply(
lambda row: processed_data[
(processed_data.arrival_time_expanded <= row.departure_time)
& (processed_data.departure_time_expanded >= row.arrival_time)
]["ID"].nunique(),
axis=1,
)
)
结果:
┌──────────────┬────────────────┬─────┬──────────────────────┬─────────────────────┬───────────────┐
│ arrival_time ┆ departure_time ┆ ID ┆ arrival_time_expande ┆ departure_time_expa ┆ docked_trucks │
│ --- ┆ --- ┆ --- ┆ d ┆ nded ┆ --- │
│ datetime[ns] ┆ datetime[ns] ┆ str ┆ --- ┆ --- ┆ i64 │
│ ┆ ┆ ┆ datetime[ns] ┆ datetime[ns] ┆ │
╞══════════════╪════════════════╪═════╪══════════════════════╪═════════════════════╪═══════════════╡
│ 2023-01-01 ┆ 2023-01-01 ┆ A1 ┆ 2023-01-01 06:22:47 ┆ 2023-01-01 06:26:08 ┆ 1 │
│ 06:23:47 ┆ 06:25:08 ┆ ┆ ┆ ┆ │
│ 2023-01-01 ┆ 2023-01-01 ┆ A1 ┆ 2023-01-01 06:25:42 ┆ 2023-01-01 06:29:02 ┆ 1 │
│ 06:26:42 ┆ 06:28:02 ┆ ┆ ┆ ┆ │
│ 2023-01-01 ┆ 2023-01-01 ┆ A5 ┆ 2023-01-01 06:29:20 ┆ 2023-01-01 06:36:01 ┆ 4 │
│ 06:30:20 ┆ 06:35:01 ┆ ┆ ┆ ┆ │
│ 2023-01-01 ┆ 2023-01-01 ┆ A6 ┆ 2023-01-01 06:31:06 ┆ 2023-01-01 06:34:48 ┆ 4 │
│ 06:32:06 ┆ 06:33:48 ┆ ┆ ┆ ┆ │
│ 2023-01-01 ┆ 2023-01-01 ┆ B3 ┆ 2023-01-01 06:32:09 ┆ 2023-01-01 06:37:01 ┆ 4 │
│ 06:33:09 ┆ 06:36:01 ┆ ┆ ┆ ┆ │
│ 2023-01-01 ┆ 2023-01-01 ┆ C3 ┆ 2023-01-01 06:33:08 ┆ 2023-01-01 06:40:49 ┆ 4 │
│ 06:34:08 ┆ 06:39:49 ┆ ┆ ┆ ┆ │
│ 2023-01-01 ┆ 2023-01-01 ┆ A6 ┆ 2023-01-01 06:35:40 ┆ 2023-01-01 06:39:34 ┆ 4 │
│ 06:36:40 ┆ 06:38:34 ┆ ┆ ┆ ┆ │
│ 2023-01-01 ┆ 2023-01-01 ┆ A5 ┆ 2023-01-01 06:36:43 ┆ 2023-01-01 06:41:48 ┆ 3 │
│ 06:37:43 ┆ 06:40:48 ┆ ┆ ┆ ┆ │
│ 2023-01-01 ┆ 2023-01-01 ┆ A6 ┆ 2023-01-01 06:38:48 ┆ 2023-01-01 06:47:10 ┆ 3 │
│ 06:39:48 ┆ 06:46:10 ┆ ┆ ┆ ┆ │
└──────────────┴────────────────┴─────┴──────────────────────┴─────────────────────┴───────────────┘
这可以通过非等值连接来大大简化 - Polars 尚不支持。
join_asof()
另一种方法是手动查找每个“窗口”边界的开始+结束“行号”。
.join_asof()
(注意:我们想为此使用 LazyFrames,因此在本例中为 .lazy()
)
df = (
df.lazy()
.with_columns(
window_open = pl.col("arrival_time").dt.offset_by("-1m"),
window_close = pl.col("departure_time").dt.offset_by("1m")
)
.with_row_index()
)
starts = (
df.sort("window_open")
.join_asof(
df.sort("departure_time").with_row_index("start"),
left_on = "window_open",
right_on = "departure_time"
)
.select(pl.col("start").backward_fill())
)
ends = (
df.sort("window_close")
.join_asof(
df.sort("arrival_time").with_row_index("end"),
left_on = "window_close",
right_on = "arrival_time",
strategy = "forward"
)
.select(pl.col("end").forward_fill() + 1)
)
>>> pl.concat([starts, ends], how="horizontal").collect()
shape: (9, 2)
┌───────┬─────┐
│ start ┆ end │
│ --- ┆ --- │
│ u32 ┆ u32 │
╞═══════╪═════╡
│ 0 ┆ 2 │
│ 0 ┆ 3 │
│ 1 ┆ 7 │
│ 1 ┆ 7 │
│ 1 ┆ 8 │
│ 1 ┆ 9 │
│ 3 ┆ 9 │
│ 4 ┆ 9 │
│ 5 ┆ 9 │
└───────┴─────┘
int_ranges()
+ gather()
然后我们可以创建范围,将其用作索引以从
df
中提取值
.int_ranges()
- 构建范围.with_context()
- 授予访问df
列.gather()
- 提取索引处的值ranges = (
pl.concat([starts, ends], how="horizontal")
.select(int_ranges = pl.int_ranges("start", "end"))
.with_row_index()
.explode("int_ranges")
)
windows = (
ranges.with_context(df)
.select(
pl.col("index"),
pl.col("arrival_time", "departure_time", "ID").gather("int_ranges"),
)
)
>>> windows.collect()
shape: (47, 4)
┌───────┬─────────────────────┬─────────────────────┬─────┐
│ index ┆ arrival_time ┆ departure_time ┆ ID │
│ --- ┆ --- ┆ --- ┆ --- │
│ u32 ┆ datetime[μs] ┆ datetime[μs] ┆ str │
╞═══════╪═════════════════════╪═════════════════════╪═════╡
│ 0 ┆ 2023-01-01 06:23:47 ┆ 2023-01-01 06:25:08 ┆ A1 │
│ 0 ┆ 2023-01-01 06:26:42 ┆ 2023-01-01 06:28:02 ┆ A1 │
│ 1 ┆ 2023-01-01 06:23:47 ┆ 2023-01-01 06:25:08 ┆ A1 │
│ 1 ┆ 2023-01-01 06:26:42 ┆ 2023-01-01 06:28:02 ┆ A1 │
│ 1 ┆ 2023-01-01 06:30:20 ┆ 2023-01-01 06:35:01 ┆ A5 │
│ … ┆ … ┆ … ┆ … │
│ 7 ┆ 2023-01-01 06:39:48 ┆ 2023-01-01 06:46:10 ┆ A6 │
│ 8 ┆ 2023-01-01 06:34:08 ┆ 2023-01-01 06:39:49 ┆ C3 │
│ 8 ┆ 2023-01-01 06:36:40 ┆ 2023-01-01 06:38:34 ┆ A6 │
│ 8 ┆ 2023-01-01 06:37:43 ┆ 2023-01-01 06:40:48 ┆ A5 │
│ 8 ┆ 2023-01-01 06:39:48 ┆ 2023-01-01 06:46:10 ┆ A6 │
└───────┴─────────────────────┴─────────────────────┴─────┘
join()
+ filter()
现在我们有了可能在范围内的行,我们可以 join() 到
df
,应用过滤逻辑、group_by 和 count。
(df.join(windows, on="index", how="left")
.filter(
pl.col("window_open") <= pl.col("departure_time_right"),
pl.col("window_close") >= pl.col("arrival_time_right")
)
.group_by("index", maintain_order=True).agg(
pl.col("arrival_time", "departure_time", "ID").first(),
docked_trucks = pl.col("ID_right").n_unique()
)
.collect()
)
shape: (9, 5)
┌───────┬─────────────────────┬─────────────────────┬─────┬───────────────┐
│ index ┆ arrival_time ┆ departure_time ┆ ID ┆ docked_trucks │
│ --- ┆ --- ┆ --- ┆ --- ┆ --- │
│ u32 ┆ datetime[μs] ┆ datetime[μs] ┆ str ┆ u32 │
╞═══════╪═════════════════════╪═════════════════════╪═════╪═══════════════╡
│ 0 ┆ 2023-01-01 06:23:47 ┆ 2023-01-01 06:25:08 ┆ A1 ┆ 1 │
│ 1 ┆ 2023-01-01 06:26:42 ┆ 2023-01-01 06:28:02 ┆ A1 ┆ 1 │
│ 2 ┆ 2023-01-01 06:30:20 ┆ 2023-01-01 06:35:01 ┆ A5 ┆ 4 │
│ 3 ┆ 2023-01-01 06:32:06 ┆ 2023-01-01 06:33:48 ┆ A6 ┆ 4 │
│ 4 ┆ 2023-01-01 06:33:09 ┆ 2023-01-01 06:36:01 ┆ B3 ┆ 4 │
│ 5 ┆ 2023-01-01 06:34:08 ┆ 2023-01-01 06:39:49 ┆ C3 ┆ 4 │
│ 6 ┆ 2023-01-01 06:36:40 ┆ 2023-01-01 06:38:34 ┆ A6 ┆ 4 │
│ 7 ┆ 2023-01-01 06:37:43 ┆ 2023-01-01 06:40:48 ┆ A5 ┆ 3 │
│ 8 ┆ 2023-01-01 06:39:48 ┆ 2023-01-01 06:46:10 ┆ A6 ┆ 3 │
└───────┴─────────────────────┴─────────────────────┴─────┴───────────────┘
所以我想出了两个选项,两者都可能会因大量数据而爆炸:
首先进行交叉连接,然后过滤错误的结果,然后进行分组。遗憾的是,由于交叉连接,该查询可能会产生大量数据。
data = processed_data.join(data, how="cross").filter(
(pl.col("arrival_time_expanded") < pl.col("departure_time_right"))
& (pl.col("departure_time_expanded") > pl.col("arrival_time_right"))
)
data.groupby(
pl.col(
"arrival_time",
"departure_time",
"ID",
"arrival_time_expanded",
"departure_time_expanded",
)
).agg(pl.count())
对于大数据来说,这可能会表现得更好,但我还没有测试过,但你也会失去一点精度。为了减少基数,我们将到达和出发时间四舍五入到下一分钟,然后分解表格以了解每分钟哪辆卡车在仓库中。
time_precision = "1m"
processed_data = data.sort(by=pl.col("arrival_time")).with_columns(
arrival_time_expanded=pl.col("arrival_time").dt.round(time_precision).dt.offset_by("-1m"),
departure_time_expanded=pl.col("departure_time").dt.round(time_precision).dt.offset_by("1m"),
)
processed_data = processed_data.with_columns(
pl.date_range(
pl.col("arrival_time_expanded"),
pl.col("departure_time_expanded"),
interval=time_precision,
eager=False,
).alias("interval")
).explode("interval").unique(["ID", "interval"])
processed_data.with_columns(pl.count().over(pl.col("interval"))).groupby(
pl.col(
"arrival_time",
"departure_time",
"ID",
"arrival_time_expanded",
"departure_time_expanded",
)
).agg(pl.col("count").max()).sort("arrival_time")
shape: (9, 6)
┌─────────────────────┬─────────────────────┬─────┬───────────────────────┬─────────────────────────┬───────┐
│ arrival_time ┆ departure_time ┆ ID ┆ arrival_time_expanded ┆ departure_time_expanded ┆ count │
│ --- ┆ --- ┆ --- ┆ --- ┆ --- ┆ --- │
│ datetime[μs] ┆ datetime[μs] ┆ str ┆ datetime[μs] ┆ datetime[μs] ┆ u32 │
╞═════════════════════╪═════════════════════╪═════╪═══════════════════════╪═════════════════════════╪═══════╡
│ 2023-01-01 06:23:47 ┆ 2023-01-01 06:25:08 ┆ A1 ┆ 2023-01-01 06:23:00 ┆ 2023-01-01 06:26:00 ┆ 1 │
│ 2023-01-01 06:26:42 ┆ 2023-01-01 06:28:02 ┆ A1 ┆ 2023-01-01 06:26:00 ┆ 2023-01-01 06:29:00 ┆ 2 │
│ 2023-01-01 06:30:20 ┆ 2023-01-01 06:35:01 ┆ A5 ┆ 2023-01-01 06:29:00 ┆ 2023-01-01 06:36:00 ┆ 4 │
│ 2023-01-01 06:32:06 ┆ 2023-01-01 06:33:48 ┆ A6 ┆ 2023-01-01 06:31:00 ┆ 2023-01-01 06:35:00 ┆ 4 │
│ 2023-01-01 06:33:09 ┆ 2023-01-01 06:36:01 ┆ B3 ┆ 2023-01-01 06:32:00 ┆ 2023-01-01 06:37:00 ┆ 4 │
│ 2023-01-01 06:34:08 ┆ 2023-01-01 06:39:49 ┆ C3 ┆ 2023-01-01 06:33:00 ┆ 2023-01-01 06:41:00 ┆ 4 │
│ 2023-01-01 06:36:40 ┆ 2023-01-01 06:38:34 ┆ A6 ┆ 2023-01-01 06:36:00 ┆ 2023-01-01 06:40:00 ┆ 4 │
│ 2023-01-01 06:37:43 ┆ 2023-01-01 06:40:48 ┆ A5 ┆ 2023-01-01 06:37:00 ┆ 2023-01-01 06:42:00 ┆ 4 │
│ 2023-01-01 06:39:48 ┆ 2023-01-01 06:46:10 ┆ A6 ┆ 2023-01-01 06:39:00 ┆ 2023-01-01 06:47:00 ┆ 3 │
└─────────────────────┴─────────────────────┴─────┴───────────────────────┴─────────────────────────┴───────┘