Dask延迟数据不匹配

问题描述 投票:0回答:1

我希望用 dask 将许多数据帧组合成 1 个数据帧。但是,当我尝试使用 dd.from_delayed(parts, meta=types) 读取这些数据帧时,出现错误

Metadata mismatch found in 'from_delayed'
.

完整错误:

Metadata mismatch found in `from_delayed`.

Partition type: `pandas.core.frame.DataFrame`
+--------+-------+----------+
| Column | Found | Expected |
+--------+-------+----------+
| 'col3' | -     | object   |
+--------+-------+----------+

我知道这是因为我希望合并的数据框没有相同的列。列中不存在的数据应标记为 NA。设置 verify_meta=False 将消除这些错误,但会导致下游出现问题,因为某些分区与元数据不匹配。

代码:

import pandas as pd
import dask.dataframe as dd
from dask.diagnostics import ProgressBar
from dask import delayed
import os

def dict_to_dataframe(dict):
    return pd.DataFrame.from_dict(dict)


data_a = {'col1': [[1, 2, 3, 4], [5, 6, 7, 8]], 'col2': [[9, 10, 11, 12], [13, 14, 15, 16]]}        
data_b = {'col1': [[17, 18, 19, 20], [21, 22, 23, 24]], 'col3': [[25, 26, 27, 28], [29, 30, 31, 32]]}

parts = [delayed(dict_to_dataframe)(fn) for fn in [data_a, data_b]]
types = pd.DataFrame(columns=['col1', 'col2', 'col3'], dtype=object)
ddf_result = dd.from_delayed(parts, meta=types)

print()
print('Write to file')
file_path = os.path.join('test.hdf')
with ProgressBar():
    ddf_result.compute().sort_index().to_hdf(file_path, key=type, format='table')

written = dd.read_hdf(file_path, key=type)
python pandas dask dask-dataframe dask-delayed
1个回答
0
投票

为了允许使用

delayed
创建单个数据框,可以在
delayed
对象的形成过程中强制执行特定模式。在相关答案的基础上构建,可以修改
dict_to_dataframe
以合并业务逻辑,例如:

def dict_to_dataframe(dict, common_columns):
    df = pd.DataFrame.from_dict(dict)
    # keep only the columns known to be common to all dataframes
    df = df[common_columns]
    return df

meta
示例进行适当的修改,其余代码应该可以工作。这是完整的片段:

import dask.dataframe as dd
import pandas as pd
from dask import delayed
from dask.diagnostics import ProgressBar


def dict_to_dataframe(dict, common_columns):
    df = pd.DataFrame.from_dict(dict)
    # keep only the columns known to be common to all dataframes
    df = df[common_columns]
    ...
    return df


data_a = {
    "col1": [[1, 2, 3, 4], [5, 6, 7, 8]],
    "col2": [[9, 10, 11, 12], [13, 14, 15, 16]],
}
data_b = {
    "col1": [[17, 18, 19, 20], [21, 22, 23, 24]],
    "col3": [[25, 26, 27, 28], [29, 30, 31, 32]],
}

common_columns = ["col1"]

parts = [delayed(dict_to_dataframe)(fn, common_columns) for fn in [data_a, data_b]]
types = pd.DataFrame(columns=common_columns, dtype=object)
ddf_result = dd.from_delayed(parts, meta=types)

print("Write to file")
file_path = "test.csv"
with ProgressBar():
    ddf_result.compute().sort_index().to_csv(file_path, index=False)

written = dd.read_csv(file_path)
© www.soinside.com 2019 - 2024. All rights reserved.