带有大型csv文件的Dask数据帧合并内存错误

问题描述 投票:0回答:1

这是我的代码的简化版本。

import dask
import dask.dataframe as dask_frame
from dask.distributed import Client, LocalCluster


def main():
    cluster = LocalCluster(n_workers=4, threads_per_worker=2)
    client = Client(cluster)

    csv_path_one = "" # both have 70 columns and around 70 million rows. at a size of about 25 gigabytes
    csv_path_two = "" 

    # the columns are a mix of ints floats datetimes and strings 
    # almost all string lengths are less than 15 two of the longest string columns have a max length of 70

    left_df = dask_frame.read_csv(csv_path_one, sep="|", quotechar="+", encoding="Latin-1", dtype="object")
    right_df = dask_frame.read_csv(csv_path_one, sep=",", quotechar="\"", encoding="utf-8", dtype="object")

    cand_keys = [""] # I have 3
    merged = dask_frame.merge(left_df, right_df, how='outer', on=cand_keys, suffixes=("_L", "_R"),indicator=True)

    missing_mask = merged._merge != 'both'
    missing_findings: dask_frame.DataFrame = merged.loc[missing_mask, cand_keys + ["_merge"]]

    print(f"Running {client}")
    missing_findings.to_csv("results/findings-*.csv")

    cluster.close()
    client.close()

if __name__ == '__main__':
    main()

此示例永远不会结束,昏迷到某个特定的部分,然后一个或多个工作人员立即超过内存限制,保姆杀死了他们并回滚了所有工作人员的进度查看诊断页面时,通常在随机拆分任务的一半左右发生内存高峰。

我正在Windows上运行Dask 2.9.1。我可以更新Dask,但是当前的设置很麻烦,我不知道它是否可以解决我的问题

python dask dask-distributed
1个回答
0
投票

对2.15的更新已解决此问题。

© www.soinside.com 2019 - 2024. All rights reserved.