我有一堆文件,我想使用 Python 的多处理并行读取这些文件,并将所有数据收集到单个 NumPy 数组中。为此,我想定义一个共享内存 NumPy 数组并将其切片传递给不同的进程以并行读取。下面的代码给出了我想要做的事情的玩具插图,我试图使用多重处理来修改 numpy 数组。
示例1:
import numpy as np
import multiprocessing
def do_stuff(i, arr):
arr[:]=i
return
def print_error(err):
print(err)
if __name__ == '__main__':
idx = [0,1,2,3]
# Need to fill this array in parallel
arr = np.zeros(4)
p = multiprocessing.Pool(4)
# Passing slices to arr to modify using multiprocessing
for i in idx:
p.apply(do_stuff, args=(i,arr[i:i+1]))
p.close()
p.join()
print(arr)
在这段代码中,我希望 arr 填充为 0, 1, 2, 3。然而,这会打印 arr 全为零。阅读答案后here,我使用 multiprocessing.Array 定义共享内存变量并修改我的代码如下
示例2:
import numpy as np
import multiprocessing
def do_stuff(i, arr):
arr[:]=i
return
def print_error(err):
print(err)
if __name__ == '__main__':
idx = [0,1,2,3]
p = multiprocessing.Pool(4)
# Shared memory Array
shared = multiprocessing.Array('d', 4)
arr = np.ctypeslib.as_array(shared.get_obj())
for i in idx:
p.apply(do_stuff, args=(i,arr[i:i+1]))
p.close()
p.join()
print(arr)
这也会打印 arr 的全零。但是,当我在 main 外部定义数组并使用 pool.map 时,代码可以正常工作。例如,以下代码有效
示例3:
import numpy as np
import multiprocessing
shared = multiprocessing.Array('d', 4)
arr = np.ctypeslib.as_array(shared.get_obj())
def do_stuff(i):
arr[i]=i
return
def print_error(err):
print(err)
if __name__ == '__main__':
idx = [0,1,2,3]
p = multiprocessing.Pool(4)
shared = multiprocessing.Array('d', 4)
p.map(do_stuff, idx)
p.close()
p.join()
print(arr)
这将打印 [0,1,2,3]。
我对这一切感到非常困惑。我的问题是:
当我定义 arr = np.zeros(4) 时,哪个处理器拥有这个变量?然后,当我将该数组的切片发送到不同的处理器时,如果这些处理器上未定义该变量,则发送的内容是什么。
为什么示例 2 不起作用而示例 3 起作用?
我正在 Linux 和 Python/3.7/4 上工作
当我定义 arr = np.zeros(4) 时,哪个处理器拥有这个变量?
只有主进程才能访问它。如果您使用“fork”作为启动方法,则子进程可以访问所有内容,但是一旦有东西尝试修改它,它就会在修改之前复制到它自己的私有内存空间(写入时复制)。如果您有大型只读数组,这会减少开销,但对于将数据写回这些数组并没有多大帮助。
如果这些处理器上未定义此变量,则发送什么内容。
当参数通过管道从主进程发送后重新构造时,会在子进程中创建一个新数组,并且
pickle
。数据被序列化为文本并重新构造,因此除了切片中数据的值之外,不会保留任何信息。这是一个全新的物体。
为什么示例 2 不起作用,而示例 3 却起作用?
示例 3 之所以有效,是因为在“分叉”时(调用
Pool
的那一刻),arr
已经被创建,并将被共享。同样重要的是,您使用 Array
来创建它,因此当您尝试修改数据时,数据会被共享(其确切机制很复杂)。
示例 2 的工作方式与示例 1 的工作方式不同:您将数组的切片作为参数传递,它将转换为一个全新的对象,因此
arr
函数中的 do_stuff
只是一个主进程中 arr[i:i+1]
的副本。在调用 Pool
之前创建将在进程之间共享的任何内容仍然很重要(如果您依赖“fork”来共享数据),但这并不是此示例不起作用的原因。
你应该知道:示例 3 只能工作,因为你在 Linux 上,默认启动方法是
fork
。这不是首选的启动方法,因为复制处于锁定状态的锁对象可能会发生死锁。这在 Windows 上根本不起作用,并且默认情况下在 3.8 及更高版本的 MacOS 上也不起作用。
所有这些问题的最佳解决方案(最便携)是将
Array
本身作为参数传递,并在子进程内重新构造 numpy 数组。这有一个复杂之处,即“共享对象”只能在创建子进程时作为参数传递。如果您使用 Process
,这并不是什么大问题,但是使用 Pool
,您基本上必须将任何共享对象作为参数传递给初始化函数,并将重新构造的数组作为子级的全局变量范围。例如,在此示例中,尝试将 buf
作为参数传递给 p.map
或 p.apply
时会出现错误,但将 buf
作为 initargs=(buf,)
传递给 Pool()
时不会出现错误
import numpy as np
from multiprocessing import Pool, Array
def init_child(buf):
global arr #use global context (for each process) to pass arr to do_stuff
arr = np.frombuffer(buf.get_obj(), dtype='d')
def do_stuff(i):
global arr
arr[i]=i
if __name__ == '__main__':
idx = [0,1,2,3]
buf = Array('d', 4)
arr = np.frombuffer(buf.get_obj(), dtype='d')
arr[:] = 0
#"with" context is easier than writing "close" and "join" all the time
with Pool(4, initializer=init_child, initargs=(buf,)) as p:
for i in idx:
p.apply(do_stuff, args=(i,)) #you could pass more args to get slice indices too
print(arr)
3.8 及以上版本有一个新模块,它比
Array
或任何其他 sharedctypes
类更好,称为:shared_memory
。这使用起来有点复杂,并且有一些额外的依赖于操作系统的麻烦,但理论上它的开销更低,速度更快。如果你想深入了解,我已经写了一篇关于 shared_memory
主题的 fewanswers,并且最近回答了很多关于并发的一般问题,如果你想看看我过去一两个月的回答。