我希望用 dask 将许多数据帧组合成 1 个数据帧。但是,当我尝试使用 dd.from_delayed(parts, meta=types) 读取这些数据帧时,出现错误
Metadata mismatch found in 'from_delayed'
.
完整错误:
Metadata mismatch found in `from_delayed`.
Partition type: `pandas.core.frame.DataFrame`
+--------+-------+----------+
| Column | Found | Expected |
+--------+-------+----------+
| 'col3' | - | object |
+--------+-------+----------+
我知道这是因为我希望合并的数据框没有相同的列。列中不存在的数据应标记为 NA。设置 verify_meta=False 将消除这些错误,但会导致下游出现问题,因为某些分区与元数据不匹配。
代码:
import pandas as pd
import dask.dataframe as dd
from dask.diagnostics import ProgressBar
from dask import delayed
import os
def dict_to_dataframe(dict):
return pd.DataFrame.from_dict(dict)
data_a = {'col1': [[1, 2, 3, 4], [5, 6, 7, 8]], 'col2': [[9, 10, 11, 12], [13, 14, 15, 16]]}
data_b = {'col1': [[17, 18, 19, 20], [21, 22, 23, 24]], 'col3': [[25, 26, 27, 28], [29, 30, 31, 32]]}
parts = [delayed(dict_to_dataframe)(fn) for fn in [data_a, data_b]]
types = pd.DataFrame(columns=['col1', 'col2', 'col3'], dtype=object)
ddf_result = dd.from_delayed(parts, meta=types)
print()
print('Write to file')
file_path = os.path.join('test.hdf')
with ProgressBar():
ddf_result.compute().sort_index().to_hdf(file_path, key=type, format='table')
written = dd.read_hdf(file_path, key=type)
为了允许使用
delayed
创建单个数据框,可以在 delayed
对象的形成过程中强制执行特定模式。在相关答案的基础上构建,可以修改dict_to_dataframe
以合并业务逻辑,例如:
def dict_to_dataframe(dict, common_columns):
df = pd.DataFrame.from_dict(dict)
# keep only the columns known to be common to all dataframes
df = df[common_columns]
return df
对
meta
示例进行适当的修改,其余代码应该可以工作。这是完整的片段:
import dask.dataframe as dd
import pandas as pd
from dask import delayed
from dask.diagnostics import ProgressBar
def dict_to_dataframe(dict, common_columns):
df = pd.DataFrame.from_dict(dict)
# keep only the columns known to be common to all dataframes
df = df[common_columns]
...
return df
data_a = {
"col1": [[1, 2, 3, 4], [5, 6, 7, 8]],
"col2": [[9, 10, 11, 12], [13, 14, 15, 16]],
}
data_b = {
"col1": [[17, 18, 19, 20], [21, 22, 23, 24]],
"col3": [[25, 26, 27, 28], [29, 30, 31, 32]],
}
common_columns = ["col1"]
parts = [delayed(dict_to_dataframe)(fn, common_columns) for fn in [data_a, data_b]]
types = pd.DataFrame(columns=common_columns, dtype=object)
ddf_result = dd.from_delayed(parts, meta=types)
print("Write to file")
file_path = "test.csv"
with ProgressBar():
ddf_result.compute().sort_index().to_csv(file_path, index=False)
written = dd.read_csv(file_path)