我遇到了一个问题,我正在保存架构略有不同但具有共享分区列的不同镶木地板文件。我创建了以下内容作为最小的可重现示例:
from dask import dataframe as dd
import pandas as pd
import shutil
def save_parquet():
df1 = pd.DataFrame({"A": [1], "B": [1]})
df2 = pd.DataFrame({"A": [2], "C": [2]})
df1.to_parquet("test.parquet", partition_cols=["A"])
df2.to_parquet("test.parquet", partition_cols=["A"])
def load_parquet():
filters = [[
("A", "==", 2)
]]
ddf = dd.read_parquet("test.parquet", columns=["A", "C"], filters=filters)
def main():
save_parquet()
load_parquet()
if __name__=="__main__":
main()
运行上述命令会导致以下异常:
Traceback (most recent call last):
File "/home/.../.local/lib/python3.8/site-packages/dask/backends.py", line 133, in wrapper
return func(*args, **kwargs)
File "/home/.../.local/lib/python3.8/site-packages/dask/dataframe/io/parquet/core.py", line 578, in read_parquet
meta, index, columns = set_index_columns(meta, index, columns, auto_index_allowed)
File "/home/.../.local/lib/python3.8/site-packages/dask/dataframe/io/parquet/core.py", line 1487, in set_index_columns
raise ValueError(
ValueError: The following columns were not found in the dataset {'C'}
The following columns were found Index(['B', 'A'], dtype='object')
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "mre.py", line 26, in <module>
main()
File "mre.py", line 23, in main
load_parquet()
File "mre.py", line 15, in load_parquet
ddf = dd.read_parquet("test.parquet", columns=["A", "C"], filters=filters)
File "/home/.../.local/lib/python3.8/site-packages/dask/backends.py", line 135, in wrapper
raise type(e)(
ValueError: An error occurred while calling the read_parquet method registered to the pandas backend.
Original Message: The following columns were not found in the dataset {'C'}
The following columns were found Index(['B', 'A'], dtype='object')
我的期望是
("A", "==", 2)
过滤器应该阻止我们从df1
加载模式,并且无论它是否加载df1
,它应该能够从"C"
找到
df2
列。我在这里错过了什么吗?
将
columns
字段更改为 columns=["A", "B"]
成功读取数据,所以感觉我想要做的事情应该是可能的。
这篇文章表明 read_parquet 从它遇到的第一个 parquet 文件中读取架构,但您可以指定一个架构来避免这种情况。
指定
schema
就像
import pyarrow as pa
...
ddf = dd.read_parquet("test.parquet", columns=["A", "C"], filters=filters, schema=pa.schema({"A": pa.int64(), "C": pa.int64()}))
仍然会触发异常。
指定
schema
而不指定 columns
不会触发异常,但会返回一个没有“C”列的 dask 数据帧(似乎与架构中的内容无关):
>>> print(ddf.columns)
Index(['B', 'A'], dtype='object')
有没有办法阻止
read_parquet
使用应过滤掉的 .parquet 文件?
找到了一个有点愚蠢的解决方法。
dask.dataframe.read_parquet 可以获取分区内的 parquet 文件列表(而不是顶级 parquet 文件夹)。我只是使用路径通配符手动过滤到我想要使用的相关镶木地板文件(基于分区文件夹)并直接传递该列表。
需要指出的是,(py)箭头不支持模式演化。 Fastparquet 打算在不久的将来这样做,目前可用的支持很差(即可能会破裂)。
>>> pd.read_parquet("test.parquet", engine="fastparquet", dtypes={"C": "float", "B": "float", "A": "int"})
C B A
0 NaN 1.0 1
1 NaN NaN 2