我有带有“pool.imap_unordered”的多处理脚本。
我想用
multiprocessing.Lock()
你能写出以下脚本的正确使用方法吗?
import multiprocessing
def my_func(df):
# modify df here
# ...
# df = df.head(1)
return df
if __name__ == "__main__":
df = pd.DataFrame({'a': [2, 2, 1, 1, 3, 3], 'b': [4, 5, 6, 4, 5, 6], 'c': [4, 5, 6, 4, 5, 6]})
with multiprocessing.Pool() as pool:
groups = (g for _, g in df.groupby("a"))
print(df)
print(groups)
out = []
for res in pool.imap_unordered(my_func, groups):
out.append(res)
final_df = pd.concat(out)
以下是您可以如何使用锁。在此代码中使用锁没有任何功能上的好处:
import multiprocessing as mp
import pandas as pd
from functools import partial
def my_func(lock, df):
with lock:
print(df)
return df
if __name__ == "__main__":
df = pd.DataFrame(
{"a": [2, 2, 1, 1, 3, 3], "b": [4, 5, 6, 4, 5, 6], "c": [4, 5, 6, 4, 5, 6]}
)
with mp.Manager() as manager:
p = partial(my_func, manager.Lock())
with mp.Pool() as pool:
groups = (g for _, g in df.groupby("a"))
out = list(pool.imap_unordered(p, groups))
print(pd.concat(out))
注意 multiprocessing.Manager 类的使用。这允许您通过 Manager 代理在子进程之间共享对象。
输出:
a b c
2 1 6 6
3 1 4 4
a b c
0 2 4 4
1 2 5 5
a b c
4 3 5 5
5 3 6 6
a b c
2 1 6 6
3 1 4 4
0 2 4 4
1 2 5 5
4 3 5 5
5 3 6 6