dask dataframe:合并两个数据帧,计算缺失值并写入csv只使用部分CPU(每个CPU占20%)

问题描述 投票:0回答:1

我想合并两个dask数据帧,用列中值计算缺失值并将合并的数据框导出到csv文件。我遇到了一个问题:我当前的代码无法利用所有8个CPU(每个CPU约占20%)

我不确定哪个部分限制了CPU使用率。这是可重复的代码

import numpy as np
import pandas as pd 
df1 = pd.DataFrame(
    np.c_[(np.random.randint(100, size=(10000, 1)), np.random.randn(10000, 3))],
    columns=['id', 'a', 'b', 'c'])
df2 = pd.DataFrame(
    np.c_[(np.array(range(100)), np.random.randn(100, 10000))],
    columns=['id'] + ['d_' + str(i) for i in range(10000)])
df1.id=df1.id.astype(int).astype(object)
df2.id=df2.id.astype(int).astype(object)

## some cells are missing in df2
df2.iloc[:, 1:] = df2.iloc[:,1:].mask(np.random.random(df2.iloc[:, 1:].shape) < .05)

## dask codes starts here
import dask.dataframe as dd
from dask.distributed import Client
ddf1 = dd.from_pandas(df1, npartitions=3)
ddf2 = dd.from_pandas(df2, npartitions=3)
ddf = ddf1.merge(ddf2, how='left', on='id')
ddf = ddf.fillna(ddf.quantile())
ddf.to_csv('train_*.csv', index=None, header=None)

尽管调用了所有8个CPU,但只使用了每个CPU的约20%。我可以编码来提高CPU使用率吗?

dask dask-distributed
1个回答
1
投票

首先,如果你没有另外指定,那么Dask将使用线程来执行。在线程中,一次只能进行一次python操作(“GIL”),除了一些显式释放锁的低级代码。 “合并”操作涉及内存中的大量数据,我怀疑在某些时候释放锁。

其次,所有的输出都被写入文件系统,所以你总是会遇到瓶颈:无论其他处理速度如何快,你仍然需要通过存储总线提供所有这些。

如果CPU正在工作~20%,我敢说这仍然比单核版本更快?简而言之,一些工作负载比其他工作负载更好地并行化。

© www.soinside.com 2019 - 2024. All rights reserved.