我有一组时间戳事件,对于每个时间戳,我需要每个用户名的“最后”值的总和。这可以通过数据透视表来完成,但我想使用
LazyFrame
,因为有许多唯一的用户名,数据透视表会溢出 RAM。但是,LazyFrame
不支持pivot
。
唯一用户名的数量约为 1000 个,事件约为 10 数百万个。
pivot
和 DataFrame
的工作示例:输入数据框:
timestamp username kudos
i64 str i64
1690886106 "ABC" 123
1690886107 "DEF" 10
1690886110 "DEF" 12
1690886210 "GIH" 0
我可以使用
pivot
来完成任务:
df = pl.DataFrame(
{
"timestamp": [1690886106, 1690886107, 1690886110, 1690886210],
"username": ["ABC", "DEF", "DEF", "GIH"],
"kudos": [123, 10, 12, 0],
},
schema={"timestamp": pl.Int64, "username": pl.Utf8, "kudos": pl.Int64},
)
result = (
df.pivot(
index="timestamp",
columns="username",
values=["kudos"],
aggregate_function="last",
)
.select(pl.all().forward_fill())
.fill_null(strategy="zero")
.select(pl.col("timestamp"), pl.sum_horizontal(df["username"].unique().to_list()))
)
结果正确:
shape: (4, 2)
timestamp sum
i64 i64
1690886106 123
1690886107 133
1690886110 135
1690886210 135
如何使用
LazyFrame
来实现这一点,以便它对于大量唯一用户名是有效的(即使用惰性评估并且可能没有巨大的稀疏数据透视表)?
在这种情况下,您的数据透视表基本上执行两项工作,一项类似于 groupby,另一项是创建时间戳和用户名的所有唯一组合。我们可以在没有枢轴的情况下做到这一点。
首先我们创建独特的组合并将其连接回原始组合...
(df
.select(pl.col('timestamp','username').unique().implode())
.explode('timestamp')
.explode('username')
.join(df, on=['timestamp','username'], how='left'))
shape: (12, 3)
┌────────────┬──────────┬───────┐
│ timestamp ┆ username ┆ kudos │
│ --- ┆ --- ┆ --- │
│ i64 ┆ str ┆ i64 │
╞════════════╪══════════╪═══════╡
│ 1690886106 ┆ DEF ┆ null │
│ 1690886106 ┆ GIH ┆ null │
│ 1690886106 ┆ ABC ┆ 123 │
│ 1690886107 ┆ DEF ┆ 10 │
│ … ┆ … ┆ … │
│ 1690886110 ┆ ABC ┆ null │
│ 1690886210 ┆ DEF ┆ null │
│ 1690886210 ┆ GIH ┆ 0 │
│ 1690886210 ┆ ABC ┆ null │
└────────────┴──────────┴───────┘
其余操作看起来与枢轴后所做的非常相似,唯一的额外细微差别是在
forward_fill.fill_null
步骤中使用窗口函数。
把它们放在一起你有......
df=df.lazy()
(df
.select(pl.col('timestamp','username').unique().implode())
.explode('timestamp')
.explode('username')
.join(df, on=['timestamp','username'], how='left')
.with_columns(pl.col('kudos').forward_fill().fill_null(0).over('username'))
.groupby('timestamp',maintain_order=True)
.agg(pl.col('kudos').sum())
.collect())
shape: (4, 2)
┌────────────┬───────┐
│ timestamp ┆ kudos │
│ --- ┆ --- │
│ i64 ┆ i64 │
╞════════════╪═══════╡
│ 1690886106 ┆ 123 │
│ 1690886107 ┆ 133 │
│ 1690886110 ┆ 135 │
│ 1690886210 ┆ 135 │
└────────────┴───────┘
这应该比具有行求和的数据透视要快得多,因为这两个都没有得到特别好的优化。
更新更大数据
如果数据集几乎没有时间戳和用户名的独特组合,那么上面的规模将会很糟糕,因为它会产生太大的 df。
或者,我们可以使用具有
when.then
的生成器来模拟枢纽的行为
我在尝试使用新的
sum_horizontal
时遇到错误,因此我们还需要一种有效的方法来进行行求和。
从...开始
df = pl.DataFrame({
"timestamp": np.arange(1, 1e6+1),
"username": np.random.randint(0, 1000, size=int(1e6)),
"kudos": np.random.randint(0, 1000, size=int(1e6)), },
schema={"timestamp": pl.Int64,
"username": pl.Utf8,
"kudos": pl.Int64}, )
我们想要创建一系列所有唯一的用户名,我们将使用它们几次
usernames=df.get_column('username').unique()
然后将 df 转换为惰性
df=df.lazy()
现在我们为所有用户名的行总和创建一个表达式,我们必须使用内部
__add__
方法:
rowwise=pl.col(usernames[0]).__add__(pl.col(usernames[1]))
for username in usernames[2:]:
rowwise=rowwise.__add__(pl.col(username))
我尝试像
rowwise=rowwise+pl.col(username)
一样链接它,但它会创建一个巨大的括号,如(a+(b+(c+(d+e(.....))))
,这使得它在未来的步骤中死亡
然后我们做:
result=(df
.select(
['timestamp'] +
[(pl.when(pl.col('username')==x).then(pl.col('kudos'))).alias(x) for x in usernames]
)
.with_columns(pl.col(usernames).forward_fill().fill_null(0))
.select('timestamp', rowwise.alias('sum'))
.collect()
)
第一个
select
模仿枢轴,然后下一个 with_columns
执行与之前相同的前向/填充。最后一个 select 只是实现了按行求和。
我的 jupyter cell 可以在 6.9 秒内完成此操作,而枢轴方法则更接近 9 秒
如果时间戳字段有重复项,您需要执行类似的操作
result.groupby('timestamp', maintain_order=True).agg(pl.col('sum').last())