我正在多个进程上运行一个函数,它将大型pandas数据帧的字典作为输入。在启动进程时,dict被复制到每个进程,但据我所知,dict只包含对数据帧的引用,因此数据帧本身不会复制到每个进程。这是正确的,还是每个进程都得到了dict的深层副本?
import numpy as np
from multiprocessing import Pool, Process, Manager
def process_dataframe(df_dict, task_queue, result_queue):
while True:
try:
test_value = task_queue.get(block=False)
except:
break
else:
result = {df_name: df[df==test_value].sum() for df_name, df in df_dict.items()}
result_queue.put(result)
if __name__ == '__main__':
manager = Manager()
task_queue = manager.Queue()
result_queue = manager.Queue()
df_dict = {'df_1': some_large_df1, 'df_2': some_large_df2, ...}
test_values = np.random.rand(1000)
for val in test_values:
task_queue.put(val)
with Pool(processes=4) as pool:
processes = []
for _ in range(4):
# Is df_dict copied shallow or deep to each process?
p = pool.Process(target=process_dataframe, args=(df_dict,task_queue,result_queue))
processes.append(p)
p.start()
for p in processes:
p.join()
results = [result_queue.get(block=False) for _ in range(result_queue.qsize())]
TLDR:它确实传递了一个有趣的副本。但不是正常的方式。子进程和父进程共享相同的内存,除非一个或另一个更改对象(在实现copy-on-write的系统上[windows和linux都有此])。在这种情况下,为已更改的对象分配内存。
我坚信,最好是看到一些行动,而不仅仅是被告知,说,让我们进入它。
我为此从网上提取了一些示例multiprocessing
代码。示例代码适合回答此问题的帐单,但它与您问题中的代码不符。
以下所有代码都是一个脚本,但我将分解它来解释每个部分。
首先让我们创建一个dictionary
。我将使用它而不是DataFrame
,因为它们的行为类似,但我不需要安装包来使用它。
注意:id()
语法,它返回对象的唯一标识
# importing the multiprocessing module
import multiprocessing
import sys # So we can see the memory we are using
myDict = dict()
print("My dict ID is:", id(myDict))
myList = [0 for _ in range(10000)] # Just a massive list of all 0s
print('My list size in bytes:', sys.getsizeof(myList))
myDict[0] = myList
print("My dict size with the list:", sys.getsizeof(myDict))
print("My dict ID is still:", id(myDict))
print("But if I copied my dic it would be:", id(myDict.copy()))
对我来说这输出:
我的dict ID是:139687265270016 我的列表大小,以字节为单位:87624 我的dict大小与列表:240 我的dict ID仍然是:139687265270016 但是,如果我复制我的dic,它将是:139687339197496
很酷,所以我们看到id
会改变,如果我们复制对象,我们看到dictionary
只是拿着一个指向list
的指针(因此dict
的内存大小要小得多)。
现在让我们来看看Process
是否复制了字典。
def method1(var):
print("method1 dict id is:", str(id(var)))
def method2(var):
print("method2 dict id is:", str(id(var)))
if __name__ == "__main__":
# creating processes
p1 = multiprocessing.Process(target=method2, args=(myDict, ))
p2 = multiprocessing.Process(target=method1, args=(myDict, ))
# starting process 1
p1.start()
# starting process 2
p2.start()
# wait until process 1 is finished
p1.join()
# wait until process 2 is finished
p2.join()
# both processes finished
print("Done!")
在这里,我将myDict
作为arg
传递给我的子进程函数。这是我得到的输出:
method2 dict id是:139687265270016 method1 dict id是:139687265270016 完成!
注意:id
与我们之前在代码中定义字典时的相同。
如果id
永远不会改变,那么我们在所有实例中使用相同的对象。因此,理论上如果我在Process
进行更改,它应该更改主要对象。但这并不像我们预期的那样发生。
例如:让我们改变我们的method1
。
def method1(var):
print("method1 dict id is:", str(id(var)))
var[0][0] = 1
print("The first five elements of the list in the dict are:", var[0][:5])
并在我们的print
之后添加几个p2.join()
s:
p2.join()
print("The first five elements of the list in the dict are:", myDict[0][:5])
print("The first five elements of the list are:", myList[:5])
我的dict ID是:140077406931128 我的列表大小,以字节为单位:87624 我的dict大小与列表:240 我的dict ID仍然是:140077406931128 但如果我复制我的dic,那将是:140077455160376 method1 dict id是:140077406931128 dict中列表的前五个元素是:[1,0,0,0,0] method2 dict id是:140077406931128 dict中列表的前五个元素是:[0,0,0,0,0] 列表的前五个元素是:[0,0,0,0,0] 完成!
好吧那很有趣...... id
s是一样的,我可以改变函数中的对象,但dict
在主过程中不会改变...
经过一些进一步的调查后,我发现了这个问题/答案:https://stackoverflow.com/a/14750086/8150685
在创建子进程时,子进程会继承父进程的副本(包括id
s的副本!);但是,(如果您使用的操作系统是COW(写时复制),则子项和父项将使用相同的内存,除非子项或父项对数据进行更改,在这种情况下,内存将仅分配给内存你改变的变量(在你的情况下,它会复制你改变的DataFrame
)。
很抱歉很长的帖子,但我认为工作流程很好看。
希望这有所帮助。如果它帮助了你,请不要忘记在https://stackoverflow.com/a/14750086/8150685上提出问题和答案。