dask 的 read_parquet 读取加载模式时应过滤掉的文件

问题描述 投票:0回答:2

我遇到了一个问题,我正在保存架构略有不同但具有共享分区列的不同镶木地板文件。我创建了以下内容作为最小的可重现示例:

from dask import dataframe as dd
import pandas as pd
import shutil

def save_parquet():
    df1 = pd.DataFrame({"A": [1], "B": [1]})
    df2 = pd.DataFrame({"A": [2], "C": [2]})
    df1.to_parquet("test.parquet", partition_cols=["A"])
    df2.to_parquet("test.parquet", partition_cols=["A"])

def load_parquet():
    filters = [[
        ("A", "==", 2)
    ]]
    ddf = dd.read_parquet("test.parquet", columns=["A", "C"], filters=filters)

def main():
    save_parquet()
    load_parquet()

if __name__=="__main__":
    main()

运行上述命令会导致以下异常:

Traceback (most recent call last):
  File "/home/.../.local/lib/python3.8/site-packages/dask/backends.py", line 133, in wrapper
    return func(*args, **kwargs)
  File "/home/.../.local/lib/python3.8/site-packages/dask/dataframe/io/parquet/core.py", line 578, in read_parquet
    meta, index, columns = set_index_columns(meta, index, columns, auto_index_allowed)
  File "/home/.../.local/lib/python3.8/site-packages/dask/dataframe/io/parquet/core.py", line 1487, in set_index_columns
    raise ValueError(
ValueError: The following columns were not found in the dataset {'C'}
The following columns were found Index(['B', 'A'], dtype='object')

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "mre.py", line 26, in <module>
    main()
  File "mre.py", line 23, in main
    load_parquet()
  File "mre.py", line 15, in load_parquet
    ddf = dd.read_parquet("test.parquet", columns=["A", "C"], filters=filters)
  File "/home/.../.local/lib/python3.8/site-packages/dask/backends.py", line 135, in wrapper
    raise type(e)(
ValueError: An error occurred while calling the read_parquet method registered to the pandas backend.
Original Message: The following columns were not found in the dataset {'C'}
The following columns were found Index(['B', 'A'], dtype='object')

我的期望是

("A", "==", 2)
过滤器应该阻止我们从
df1
加载模式,并且无论它是否加载
df1
,它应该能够从
"C"找到
df2
。我在这里错过了什么吗?

columns
字段更改为
columns=["A", "B"]
成功读取数据,所以感觉我想要做的事情应该是可能的。

这篇文章表明 read_parquet 从它遇到的第一个 parquet 文件中读取架构,但您可以指定一个架构来避免这种情况。

指定

schema
就像

import pyarrow as pa

...

ddf = dd.read_parquet("test.parquet", columns=["A", "C"], filters=filters, schema=pa.schema({"A": pa.int64(), "C": pa.int64()}))

仍然会触发异常。

指定

schema
而不指定
columns
不会触发异常,但会返回一个没有“C”列的 dask 数据帧(似乎与架构中的内容无关):

>>> print(ddf.columns)
Index(['B', 'A'], dtype='object')

有没有办法阻止

read_parquet
使用应过滤掉的 .parquet 文件?

python pandas dask parquet pyarrow
2个回答
0
投票

找到了一个有点愚蠢的解决方法。

dask.dataframe.read_parquet 可以获取分区内的 parquet 文件列表(而不是顶级 parquet 文件夹)。我只是使用路径通配符手动过滤到我想要使用的相关镶木地板文件(基于分区文件夹)并直接传递该列表。


0
投票

需要指出的是,(py)箭头不支持模式演化。 Fastparquet 打算在不久的将来这样做,目前可用的支持很差(即可能会破裂)。

>>> pd.read_parquet("test.parquet", engine="fastparquet", dtypes={"C": "float", "B": "float", "A": "int"})

    C    B  A
0 NaN  1.0  1
1 NaN  NaN  2
© www.soinside.com 2019 - 2024. All rights reserved.