我在目录
"dataset_path"
中有一个镶木地板数据集,其中有一个索引列date
.
元数据由dask创建,相关模式数据如下:
date: timestamp[us]
-- schema metadata --
pandas: '{"index_columns": ["date"], ...
我使用 pyarrow 将数据集读入一个 dask 数据框:
import dask.dataframe as dd
ddf = dd.read_parquet("dataset_path", engine="pyarrow", calculate_divisions=True)
为了确保这些部门是众所周知的,我运行:
print(ddf.divisions)
(Timestamp('2020-01-01 00:00:00'), Timestamp('2020-02-01 00:00:00'), ...
尽管如此,dask 似乎忽略了很多操作中的划分。例如运行以下代码
ddf.loc[ddf.index == pd.Timestamp("2020-01-01")].compute()
启用调试后
import logging
logging.basicConfig(format="%(message)s", level=logging.DEBUG)
输出到日志:
open file: ./dataset_path/part.0.parquet
open file: ./dataset_path/part.1.parquet
open file: ./dataset_path/part.2.parquet
...
因此 dask 处理数据集中的所有文件。即使从部门很明显只有第一个文件可以匹配。
我在这里错过了什么?非常感谢任何帮助我告诉 dask 只查看相关文件的帮助。
其他基于索引的操作也会出现同样的问题。例如在索引上合并
pdf = pd.DataFrame({"y": 0}, index=pd.Index([pd.Timestamp("2020-01-01")]))
dd.merge(ddf, pdf, left_index=True, right_index=True).compute()
同时打开数据集中的所有文件。
我还尝试从 pyarrow 切换到 fastparquet 作为引擎。但问题依然存在。
最后,这里有一些代码来生成
"my_dataset"
的虚拟版本以获得最小的工作示例:
from datetime import date
from dateutil.relativedelta import relativedelta
import pandas as pd
def write_parquet_timeseries(path: str, start: date, end: date):
for part, pdf in enumerate(generate_timeseries(start, end)):
dd.to_parquet(
dd.from_pandas(pdf, npartitions=1),
path,
engine="pyarrow",
overwrite=part == 0,
append=part > 0,
write_metadata_file=True,
)
def generate_timeseries(start: date, end: date):
start0, end0 = None, start
while end0 < end:
start0, end0 = end0, end0 + relativedelta(months=1)
yield timeseries(start0, end0)
def timeseries(start: date, end: date, num_rows: int = 2**16, num_cols: int = 2**4):
index = pd.Index(pd.date_range(start, end, inclusive="left"), name="date").repeat(num_rows)
return pd.DataFrame({f"x{i}": range(i, len(index) + i) for i in range(num_cols)}, index=index)
write_parquet_timeseries("dataset_path", date(2020, 1, 1), date(2021, 1, 1))
Dask 只能将有限的选择操作子集“下推”到加载程序层。例如,如果你更换
ddf.loc[ddf.index == pd.Timestamp("2020-01-01")].compute()
语法更简单
ddf.loc[pd.Timestamp("2020-01-01")].compute()
你应该得到你想要的负载优化。这是因为在第一种情况下,dask 不知道您只想使用
ddf.index
进行选择,因此它会考虑数据框中的所有值。未来更高级别的优化可能能够发现并使用此类模式。