我有一个函数f(x)
我想要并行评估值列表xrange
。该函数执行以下操作:
def f(x, wrange, dict1, dict2):
out_list = []
v1 = dict1[x]
for w in wrange:
v2 = dict2[x-w]
out_list += [np.dot(v1, v2)]
return out_list
它从字典dict1
(字典dict2
中的向量)中获取值,然后将它们相乘。现在,我并行执行此操作的常规方法是这样的:
import functools
import multiprocessing
par_func = functools.partial(f, wrange=wrange, dict1=dict1, dict2=dict2)
p = multiprocessing.Pool(4)
ssdat = p.map(par_func, wrange)
p.close()
p.join()
现在当dict1
和dict2
是大字典时,这会导致代码失败并出现错误
File "/anaconda3/lib/python3.6/multiprocessing/connection.py", line 393, in _send_bytes header = struct.pack("!i", n)
struct.error: 'i' format requires -2147483648 <= number <= 2147483647
而且我认为这是因为pool
正在制作dict1
和dict2
的副本,用于评估我的功能。相反,是否有一种有效的方法将这些字典设置为共享内存对象? map
是最好的功能吗?
如果你使用的是基于fork
的系统(阅读:不是Windows),这个问题的一个解决方案是将dict
s置于全局变量中,编写一个不将它们作为参数的函数,而只是从它的访问中获取它们拥有全局,并使用它。 functools.partial
is, unfortunately, unsuited to this use case,但是你的用例可以很容易地用全局变量和def
-ed函数替换:
import multiprocessing
# Assumes wrange/dict1/dict2 defined or imported somewhere at global scope,
# prior to creating the Pool
def par_func(x):
return f(x, wrange, dict1, dict2)
# Using with statement implicitly terminates the pool, saving close/join calls
# and guaranteeing an exception while mapping doesn't leave the pool alive indefinitely
with multiprocessing.Pool(4) as p:
ssdat = p.map(par_func, wrange)
在创建dict1
之后,对dict2
/ Pool
的更改不会反映在进程之间,但您似乎无论如何都以只读方式使用它,所以这不是问题。
如果你在Windows上,或者需要改变dict
s,你可以随时make a multiprocessing.Manager
and make dict
proxies with the dict
method of the manager(这些是共享的dict
s,更新了密钥分配),但它更加丑陋和慢,所以如果可能的话我会劝阻它。
如果要在使用多处理的进程之间共享内存,则需要使用multiprocessing.Array显式共享对象。这并不理想,因为您想要从dicts访问元素并找到正确的数据可能会非常耗时。如果它确实成为您的问题,可能会有这种方法。
正如@Peque所提到的,另一种选择是使用threading。使用线程,内存可以自动在所有进程中共享,但由于global interpreter lock(GIL),您可能会遇到性能问题。 GIL是Python保持线程安全并避免竞争条件的方法。