`ValueError:无法使用 Dask DataFrame 从重复轴重新索引

问题描述 投票:0回答:1

我一直在尝试调整我的代码以利用 Dask 来利用多台机器进行处理。虽然初始数据加载并不耗时,但后续处理在 8 核 i5 上大约需要 12 小时。这并不理想,并且认为使用 Dask 帮助跨机器分散处理将是有益的。以下代码适用于标准 Pandas 方法:

import pandas as pd
artists = pd.read_csv("artists.csv")
print(f"... shape before cleaning {artists.shape}")

artists["name"] = artists["name"].astype("str")

artists["name"] = (
    artists["name"]
    .str.encode("ascii", "ignore")
    .str.decode("ascii")
    .str.lower()
    .str.replace("&", " and ", regex=False)
    .str.strip()
)

转换为 Dask 似乎很简单,但我一路上遇到了一些问题。以下适用于 Dask 的代码会引发

ValueError: cannot reindex from a duplicate axis
错误:

import dask.dataframe as dd
from dask.distributed import Client

artists = dd.read_csv("artists.csv")
print(f"... shape before cleaning {artists.shape}")

artists["name"] = artists["name"].astype(str).compute()
artists["name"] = (
    artists["name"]
    .str.encode("ascii", "ignore")
    .str.decode("ascii")
    .str.lower()
    .str.replace("&", " and ", regex=False)
    .str.strip().compute()
)

if __name__ == '__main__':
    client = Client()

我能看出的最好的情况是 Dask 不允许重新分配到现有的 Dask DataFrame。所以这有效:

...
artists_new = artists["name"].astype("str").compute()
...

但是,我真的不想每次都创建一个新的DataFrame。我宁愿用新的 DataFrame 替换现有的 DataFrame,主要是因为在处理之前我有多个数据清理步骤。

虽然教程和指南很有用,但它们非常基础,不涵盖此类用例。

Dask DataFrames 的首选方法是什么?

python python-3.x pandas dask dask-dataframe
1个回答
3
投票

每次您在 Dask 数据框/系列上调用

.compute()
时,它都会将其转换为
pandas
。那么这一行发生了什么

艺术家[“姓名”] = 艺术家[“姓名”].astype(str).compute()

是您正在计算字符串列,然后将

pandas
系列分配给 dask 系列(不确保分区对齐)。解决方案是仅在最终结果上调用
.compute()
,而中间步骤可以使用常规
pandas
语法:

# modified example (.compute is removed)
artists["name"] = artists["name"].astype(str).str.lower()
© www.soinside.com 2019 - 2024. All rights reserved.