在Python多处理中传递共享内存变量

问题描述 投票:0回答:1

我有一堆文件,我想使用 Python 的多处理并行读取这些文件,并将所有数据收集到单个 NumPy 数组中。为此,我想定义一个共享内存 NumPy 数组并将其切片传递给不同的进程以并行读取。下面的代码给出了我想要做的事情的玩具插图,我试图使用多重处理来修改 numpy 数组。

示例1:


import numpy as np
import multiprocessing

def do_stuff(i, arr):
    arr[:]=i
    return

def print_error(err):
    print(err)

if __name__ == '__main__':
    idx = [0,1,2,3]
    # Need to fill this array in parallel
    arr = np.zeros(4)
    p = multiprocessing.Pool(4)
    # Passing slices to arr to modify using multiprocessing
    for i in idx:
        p.apply(do_stuff, args=(i,arr[i:i+1]))
    p.close()
    p.join()
    print(arr)

在这段代码中,我希望 arr 填充为 0, 1, 2, 3。然而,这会打印 arr 全为零。阅读答案后here,我使用 multiprocessing.Array 定义共享内存变量并修改我的代码如下

示例2:

import numpy as np
import multiprocessing

def do_stuff(i, arr):
    arr[:]=i
    return

def print_error(err):
    print(err)

if __name__ == '__main__':
    idx = [0,1,2,3]
    p = multiprocessing.Pool(4)
    # Shared memory Array
    shared = multiprocessing.Array('d', 4)
    arr = np.ctypeslib.as_array(shared.get_obj())

    for i in idx:
        p.apply(do_stuff, args=(i,arr[i:i+1]))
    p.close()
    p.join()
    print(arr)

这也会打印 arr 的全零。但是,当我在 main 外部定义数组并使用 pool.map 时,代码可以正常工作。例如,以下代码有效

示例3:

import numpy as np
import multiprocessing

shared = multiprocessing.Array('d', 4)
arr = np.ctypeslib.as_array(shared.get_obj())

def do_stuff(i):
    arr[i]=i
    return

def print_error(err):
    print(err)

if __name__ == '__main__':
    idx = [0,1,2,3]
    p = multiprocessing.Pool(4)
    shared = multiprocessing.Array('d', 4)
    p.map(do_stuff, idx)
    p.close()
    p.join()
    print(arr)
             

这将打印 [0,1,2,3]。

我对这一切感到非常困惑。我的问题是:

  1. 当我定义 arr = np.zeros(4) 时,哪个处理器拥有这个变量?然后,当我将该数组的切片发送到不同的处理器时,如果这些处理器上未定义该变量,则发送的内容是什么。

  2. 为什么示例 2 不起作用而示例 3 起作用?

我正在 Linux 和 Python/3.7/4 上工作

arrays numpy multiprocessing shared-memory
1个回答
2
投票

当我定义 arr = np.zeros(4) 时,哪个处理器拥有这个变量?

只有主进程才能访问它。如果您使用“fork”作为启动方法,则子进程可以访问所有内容,但是一旦有东西尝试修改它,它就会在修改之前复制到它自己的私有内存空间(写入时复制)。如果您有大型只读数组,这会减少开销,但对于将数据写回这些数组并没有多大帮助。

如果这些处理器上未定义此变量,则发送什么内容。

当参数通过管道从主进程发送后重新构造时,会在子进程中创建一个新数组,并且

pickle
。数据被序列化为文本并重新构造,因此除了切片中数据的值之外,不会保留任何信息。这是一个全新的物体。

为什么示例 2 不起作用,而示例 3 却起作用?

示例 3 之所以有效,是因为在“分叉”时(调用

Pool
的那一刻),
arr
已经被创建,并将被共享。同样重要的是,您使用
Array
来创建它,因此当您尝试修改数据时,数据会被共享(其确切机制很复杂)。

示例 2 的工作方式与示例 1 的工作方式不同:您将数组的切片作为参数传递,它将转换为一个全新的对象,因此

arr
函数中的
do_stuff
只是一个主进程中
arr[i:i+1]
的副本。在调用
Pool
之前创建将在进程之间共享的任何内容仍然很重要(如果您依赖“fork”来共享数据),但这并不是此示例不起作用的原因。

你应该知道:示例 3 只能工作,因为你在 Linux 上,默认启动方法是

fork
。这不是首选的启动方法,因为复制处于锁定状态的锁对象可能会发生死锁。这在 Windows 上根本不起作用,并且默认情况下在 3.8 及更高版本的 MacOS 上也不起作用。

所有这些问题的最佳解决方案(最便携)是将

Array
本身作为参数传递,并在子进程内重新构造 numpy 数组。这有一个复杂之处,即“共享对象”只能在创建子进程时作为参数传递。如果您使用
Process
,这并不是什么大问题,但是使用
Pool
,您基本上必须将任何共享对象作为参数传递给初始化函数,并将重新构造的数组作为子级的全局变量范围。例如,在此示例中,尝试将
buf
作为参数传递给
p.map
p.apply
时会出现错误,但将
buf
作为
initargs=(buf,)
传递给
Pool()

时不会出现错误
import numpy as np
from multiprocessing import Pool, Array

def init_child(buf):
    global arr #use global context (for each process) to pass arr to do_stuff
    arr = np.frombuffer(buf.get_obj(), dtype='d')

def do_stuff(i):
    global arr
    arr[i]=i

if __name__ == '__main__':
    idx = [0,1,2,3]
    
    buf = Array('d', 4)
    arr = np.frombuffer(buf.get_obj(), dtype='d')
    arr[:] = 0
    
    #"with" context is easier than writing "close" and "join" all the time
    with Pool(4, initializer=init_child, initargs=(buf,)) as p:
        for i in idx:
            p.apply(do_stuff, args=(i,)) #you could pass more args to get slice indices too
    print(arr)

3.8 及以上版本有一个新模块,它比

Array
或任何其他
sharedctypes
类更好,称为:
shared_memory
。这使用起来有点复杂,并且有一些额外的依赖于操作系统的麻烦,但理论上它的开销更低,速度更快。如果你想深入了解,我已经写了一篇关于 shared_memory 主题的 few
 
answers
,并且最近回答了很多关于并发的一般问题,如果你想看看我过去一两个月的回答。

© www.soinside.com 2019 - 2024. All rights reserved.