例如,我使用rosetta.parallel.pandas_easy
来并行化apply
之后的groupby
:
from rosetta.parallel.pandas_easy import groupby_to_series_to_frame
df = pd.DataFrame({'a': [6, 2, 2], 'b': [4, 5, 6]},index= ['g1', 'g1', 'g2'])
groupby_to_series_to_frame(df, np.mean, n_jobs=8, use_apply=True, by=df.index)
但是,有人知道如何并行化返回DataFrame的函数吗?按预期,此代码无法用于rosetta
。
def tmpFunc(df):
df['c'] = df.a + df.b
return df
df.groupby(df.index).apply(tmpFunc)
groupby_to_series_to_frame(df, tmpFunc, n_jobs=1, use_apply=True, by=df.index)
这似乎有效,尽管它确实应该内置在熊猫中
import pandas as pd
from joblib import Parallel, delayed
import multiprocessing
def tmpFunc(df):
df['c'] = df.a + df.b
return df
def applyParallel(dfGrouped, func):
retLst = Parallel(n_jobs=multiprocessing.cpu_count())(delayed(func)(group) for name, group in dfGrouped)
return pd.concat(retLst)
if __name__ == '__main__':
df = pd.DataFrame({'a': [6, 2, 2], 'b': [4, 5, 6]},index= ['g1', 'g1', 'g2'])
print 'parallel version: '
print applyParallel(df.groupby(df.index), tmpFunc)
print 'regular version: '
print df.groupby(df.index).apply(tmpFunc)
print 'ideal version (does not work): '
print df.groupby(df.index).applyParallel(tmpFunc)
Ivan的回答很好,但看起来可以稍微简化,也消除了依赖joblib的需要:
from multiprocessing import Pool, cpu_count
def applyParallel(dfGrouped, func):
with Pool(cpu_count()) as p:
ret_list = p.map(func, [group for name, group in dfGrouped])
return pandas.concat(ret_list)
顺便说一句:这不能代替any groupby.apply(),但是它将涵盖典型的情况:它应该涵盖案例2和案例3 in the documentation,而您应该通过将参数axis=1
分配给最终的pandas.concat()
来获得案例1的行为。
我有一个用于在Pandas中并行化的技巧。我将数据帧分为多个块,将每个块放入列表的元素中,然后使用ipython的并行位对数据帧列表进行并行应用。然后,我使用pandas concat
函数将列表放在一起。
但是,这通常不适用。它对我有用,因为我要应用于数据框的每个块的功能大约需要一分钟。分解并整理我的数据并不需要很长时间。因此,这显然是一个错误。话虽如此,这是一个例子。我正在使用Ipython笔记本,因此您会在我的代码中看到%%time
魔术:
## make some example data
import pandas as pd
np.random.seed(1)
n=10000
df = pd.DataFrame({'mygroup' : np.random.randint(1000, size=n),
'data' : np.random.rand(n)})
grouped = df.groupby('mygroup')
对于此示例,我将基于上述groupby进行“块化”,但这不必是对数据进行分块的方式。虽然这是一个很常见的模式。
dflist = []
for name, group in grouped:
dflist.append(group)
设置并行位
from IPython.parallel import Client
rc = Client()
lview = rc.load_balanced_view()
lview.block = True
编写一个愚蠢的函数以应用于我们的数据
def myFunc(inDf):
inDf['newCol'] = inDf.data ** 10
return inDf
现在让我们先串行运行代码,然后并行运行。序列优先:
%%time
serial_list = map(myFunc, dflist)
CPU times: user 14 s, sys: 19.9 ms, total: 14 s
Wall time: 14 s
现在并行
%%time
parallel_list = lview.map(myFunc, dflist)
CPU times: user 1.46 s, sys: 86.9 ms, total: 1.54 s
Wall time: 1.56 s
然后只需几毫秒即可将它们合并回到一个数据帧中
%%time
combinedDf = pd.concat(parallel_list)
CPU times: user 296 ms, sys: 5.27 ms, total: 301 ms
Wall time: 300 ms
我在MacBook上运行6个IPython引擎,但是您可以看到它将执行时间从14s减少到2s。
对于长期运行的随机模拟,我可以通过使用StarCluster启动集群来使用AWS后端。但是,在很多时候,我只在MBP上跨8个CPU进行并行化。
JD Long的回答附有简短评论。我发现如果组的数量非常大(例如成千上万),并且您的apply函数执行的操作相当简单快捷,则将您的数据帧分解为多个块,然后将每个块分配给一个工人来执行groupby-apply(串行)比并行进行groupby-apply和让工作人员读取包含多个组的队列要快得多。示例:
import pandas as pd
import numpy as np
import time
from concurrent.futures import ProcessPoolExecutor, as_completed
nrows = 15000
np.random.seed(1980)
df = pd.DataFrame({'a': np.random.permutation(np.arange(nrows))})
所以我们的数据框看起来像:
a
0 3425
1 1016
2 8141
3 9263
4 8018
请注意,列“ a”具有许多组(请考虑客户ID):
len(df.a.unique())
15000
可在我们的小组上使用的功能:
def f1(group):
time.sleep(0.0001)
return group
启动池:
ppe = ProcessPoolExecutor(12)
futures = []
results = []
进行并行分组申请:
%%time
for name, group in df.groupby('a'):
p = ppe.submit(f1, group)
futures.append(p)
for future in as_completed(futures):
r = future.result()
results.append(r)
df_output = pd.concat(results)
del ppe
CPU times: user 18.8 s, sys: 2.15 s, total: 21 s
Wall time: 17.9 s
现在添加一列,将df分成更少的组:
df['b'] = np.random.randint(0, 12, nrows)
现在不是12个组,而是15000个组:
len(df.b.unique())
12
我们将对df进行分区,并对每个块进行groupby-apply。
ppe = ProcessPoolExecutor(12)
包装乐趣:
def f2(df):
df.groupby('a').apply(f1)
return df
发送每个要串行处理的块:
%%time
for i in df.b.unique():
p = ppe.submit(f2, df[df.b==i])
futures.append(p)
for future in as_completed(futures):
r = future.result()
results.append(r)
df_output = pd.concat(results)
CPU times: user 11.4 s, sys: 176 ms, total: 11.5 s
Wall time: 12.4 s
请注意,每个组花费的时间没有改变。相反,已更改的是读取工人队列的长度。我怀疑正在发生的情况是,工作人员无法同时访问共享内存,并且不断返回以读取队列,因此互相踩脚。使用较大的块进行操作时,工作人员返回的频率降低,因此此问题得以改善,整体执行速度更快。
我个人建议this thread使用dask。
正如@chrisb指出的那样,在python中使用熊猫进行多处理可能会产生不必要的开销。它也可能[[not的性能与多线程甚至单线程一样好。
Dask是专门为多进程创建的。