Dask 忽略有关 parquet 数据集划分的知识

问题描述 投票:0回答:1

我在目录

"dataset_path"
中有一个镶木地板数据集,其中有一个索引列
date
.

元数据由dask创建,相关模式数据如下:

date: timestamp[us]
-- schema metadata --
pandas: '{"index_columns": ["date"], ...

我使用 pyarrow 将数据集读入一个 dask 数据框:

import dask.dataframe as dd

ddf = dd.read_parquet("dataset_path", engine="pyarrow", calculate_divisions=True)

为了确保这些部门是众所周知的,我运行:

print(ddf.divisions)
(Timestamp('2020-01-01 00:00:00'), Timestamp('2020-02-01 00:00:00'), ...

尽管如此,dask 似乎忽略了很多操作中的划分。例如运行以下代码

ddf.loc[ddf.index == pd.Timestamp("2020-01-01")].compute()

启用调试后

import logging

logging.basicConfig(format="%(message)s", level=logging.DEBUG)

输出到日志:

open file: ./dataset_path/part.0.parquet
open file: ./dataset_path/part.1.parquet
open file: ./dataset_path/part.2.parquet
...

因此 dask 处理数据集中的所有文件。即使从部门很明显只有第一个文件可以匹配。

我在这里错过了什么?非常感谢任何帮助我告诉 dask 只查看相关文件的帮助。

其他基于索引的操作也会出现同样的问题。例如在索引上合并

pdf = pd.DataFrame({"y": 0}, index=pd.Index([pd.Timestamp("2020-01-01")]))
dd.merge(ddf, pdf, left_index=True, right_index=True).compute()

同时打开数据集中的所有文件。

我还尝试从 pyarrow 切换到 fastparquet 作为引擎。但问题依然存在。

最后,这里有一些代码来生成

"my_dataset"
的虚拟版本以获得最小的工作示例:

from datetime import date
from dateutil.relativedelta import relativedelta
import pandas as pd

def write_parquet_timeseries(path: str, start: date, end: date):
    for part, pdf in enumerate(generate_timeseries(start, end)):
        dd.to_parquet(
            dd.from_pandas(pdf, npartitions=1),
            path,
            engine="pyarrow",
            overwrite=part == 0,
            append=part > 0,
            write_metadata_file=True,
        )

def generate_timeseries(start: date, end: date):
    start0, end0 = None, start
    while end0 < end:
        start0, end0 = end0, end0 + relativedelta(months=1)
        yield timeseries(start0, end0)

def timeseries(start: date, end: date, num_rows: int = 2**16, num_cols: int = 2**4):
    index = pd.Index(pd.date_range(start, end, inclusive="left"), name="date").repeat(num_rows)
    return pd.DataFrame({f"x{i}": range(i, len(index) + i) for i in range(num_cols)}, index=index)

write_parquet_timeseries("dataset_path", date(2020, 1, 1), date(2021, 1, 1))
dask parquet pyarrow dask-dataframe fastparquet
1个回答
0
投票

Dask 只能将有限的选择操作子集“下推”到加载程序层。例如,如果你更换

ddf.loc[ddf.index == pd.Timestamp("2020-01-01")].compute()

语法更简单

ddf.loc[pd.Timestamp("2020-01-01")].compute()

你应该得到你想要的负载优化。这是因为在第一种情况下,dask 不知道您只想使用

ddf.index
进行选择,因此它会考虑数据框中的所有值。未来更高级别的优化可能能够发现并使用此类模式。

© www.soinside.com 2019 - 2024. All rights reserved.