给定以下dask数据帧:
import numpy as np
import pandas as pd
import dask.dataframe as dd
N = int(1e4)
df = pd.DataFrame(np.random.randn(N, 3), columns=list('abc'),
index=pd.date_range(datetime.now(), periods=N, freq='1min'))
df['dt'] = pd.to_datetime(df.index.date)
ddf = dd.from_pandas(df, npartitions=5)
ddf
这个慢功能:
def f(grp, M=5):
#A slow function
x = 0
for n in range(M):
for idx1, row in grp[list('abc')].items():
for idx2, v in row.items():
x += v
return x
令我惊讶的是,对于groupby +聚合操作,pandas比dask更快,例如:
%%timeit
res = ddf.groupby('dt').apply(f).compute()
#310 ms ± 3.08 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
与:
%%timeit
res = df.groupby('dt').apply(f)
#149 ms ± 3.76 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)
我在这里错过了什么吗?我认为dask会并行化这个计算吗?我的真实用例有数百万行,而且我的聚合函数非常慢。
当数据适合内存时pandas
比dask
快。我想知道你使用的是哪个版本的dask
,因为如果你没有声明你的元数据应用它应该返回一个警告。 (您的问题已编辑,我添加了元数据)。
您可以尝试使用不同数量的分区并使用多处理来运行这些实验以获得更大的N
。
%%timeit -n10
dask <= 0.17.5
res = ddf.groupby('dt').apply(f, meta=('x', 'f8'))\
.compute(get=dask.multiprocessing.get)
%%timeit -n10
dask >= 0.18.0
res = ddf.groupby('dt').apply(f, meta=('x', 'f8'))\
.compute(scheduler='processes')
对于我的笔记本电脑上的qazxsw poi和qazxsw poi,qazxsw poi版本比N=int(1e5)
版本更快。下一步将尝试改善你的功能npartitions=4
。