我有一个镶木地板文件目录,希望对所有这些文件应用一个函数并取平均值。
起初我认为 Polars 会凭借 LazyFrame 和流功能在这方面表现出色,但这似乎是一种循环数据块并迭代加载的幼稚方法,其性能优于其他方法。我的实现是否有问题导致了这种情况?
%%time
unique_years = (
pl.scan_parquet(data_glob)
.select(pl.col("time").dt.year().unique())
.collect()
.to_series()
.to_list()
)
results = []
for year in tqdm(unique_years):
results.append(
pl.scan_parquet(data_glob)
.filter(pl.col("time").dt.year()==year)
.select(["time", "market", "close"])
.sort("time")
.with_columns(pl.col("close").log().diff().over("market").alias("log_returns"))
.group_by("time")
.agg(pl.col("log_returns").mean())
.collect()
)
CPU times: user 10min 10s, sys: 4min 44s, total: 14min 55s
Wall time: 2min 4s
%%time
df = (
pl.scan_parquet(data_glob)
.select(["time", "market", "close"])
.sort("time")
.with_columns(pl.col("close").log().diff().over("market").alias("log_returns"))
.group_by("time")
.agg(pl.col("log_returns").mean())
.collect(streaming=True)
)
我认为第一种方法是不正确的,它相对较快地完成,而第二种方法则不会在任何合理的时间内执行。流媒体引擎的效率如此之低,以至于简单的循环就可以超越它,这似乎很奇怪。
流引擎不支持
sort
、diff
、over
和 mean
功能。就像 @ritchie46 提到的,窗口函数是这里最昂贵的函数。
这篇文章测试了Polars中的大部分功能,并得出了一些关于流引擎可以做什么和不能做什么的结论。注意:这些是在一对浮点列上完成的。无论哪种方式,如果您查看查询计划,您都会看到哪些部分未包含在流引擎中并从那里开始。您可以使用
explain(streaming=True)
查看 --- STREAMING
标签之外运行的内容。