我有一个非常大(只读)的数据数组,我希望由多个进程并行处理。
我喜欢
Pool.map
函数,并且想用它来并行计算该数据的函数。
我看到可以使用
Value
或 Array
类在进程之间使用共享内存数据。但是当我尝试使用它时,我在使用 Pool.map 函数时得到了 RuntimeError: 'SynchronizedString objects should only be shared between processes through inheritance
:
这是我正在尝试做的事情的简化示例:
from sys import stdin
from multiprocessing import Pool, Array
def count_it( arr, key ):
count = 0
for c in arr:
if c == key:
count += 1
return count
if __name__ == '__main__':
testData = "abcabcs bsdfsdf gdfg dffdgdfg sdfsdfsd sdfdsfsdf"
# want to share it using shared memory
toShare = Array('c', testData)
# this works
print count_it( toShare, "a" )
pool = Pool()
# RuntimeError here
print pool.map( count_it, [(toShare,key) for key in ["a", "b", "s", "d"]] )
谁能告诉我我在这里做错了什么?
所以我想做的就是在进程池中创建进程后,将有关新创建的共享内存分配数组的信息传递给进程。
再次尝试,因为我刚刚看到赏金;)
基本上我认为错误消息的意思就是它所说的 - 多处理共享内存数组不能作为参数传递(通过 pickling)。序列化数据没有意义——关键是数据是共享内存。所以你必须使共享数组成为全局的。我认为将其作为模块的属性更简洁,就像我的第一个答案一样,但在示例中将其保留为全局变量也效果很好。考虑到您不想在分叉之前设置数据的观点,这里是一个修改后的示例。如果您想要多个可能的共享数组(这就是为什么您想将 toShare 作为参数传递),您可以类似地创建共享数组的全局列表,然后将索引传递给 count_it (这将成为
for c in toShare[i]:
) 。
from sys import stdin
from multiprocessing import Pool, Array, Process
def count_it( key ):
count = 0
for c in toShare:
if c == key:
count += 1
return count
if __name__ == '__main__':
# allocate shared array - want lock=False in this case since we
# aren't writing to it and want to allow multiple processes to access
# at the same time - I think with lock=True there would be little or
# no speedup
maxLength = 50
toShare = Array('c', maxLength, lock=False)
# fork
pool = Pool()
# can set data after fork
testData = "abcabcs bsdfsdf gdfg dffdgdfg sdfsdfsd sdfdsfsdf"
if len(testData) > maxLength:
raise ValueError, "Shared array too small to hold data"
toShare[:len(testData)] = testData
print pool.map( count_it, ["a", "b", "s", "d"] )
[编辑:由于不使用 fork,以上内容在 Windows 上不起作用。然而,下面的代码在 Windows 上仍然有效,仍然使用 Pool,所以我认为这是最接近你想要的:
from sys import stdin
from multiprocessing import Pool, Array, Process
import mymodule
def count_it( key ):
count = 0
for c in mymodule.toShare:
if c == key:
count += 1
return count
def initProcess(share):
mymodule.toShare = share
if __name__ == '__main__':
# allocate shared array - want lock=False in this case since we
# aren't writing to it and want to allow multiple processes to access
# at the same time - I think with lock=True there would be little or
# no speedup
maxLength = 50
toShare = Array('c', maxLength, lock=False)
# fork
pool = Pool(initializer=initProcess,initargs=(toShare,))
# can set data after fork
testData = "abcabcs bsdfsdf gdfg dffdgdfg sdfsdfsd sdfdsfsdf"
if len(testData) > maxLength:
raise ValueError, "Shared array too small to hold data"
toShare[:len(testData)] = testData
print pool.map( count_it, ["a", "b", "s", "d"] )
不知道为什么 map 不会 Pickle 数组,但 Process 和 Pool 会 - 我想它可能已经在 Windows 上的子进程初始化时转移了。请注意,数据在分叉后仍然设置。
如果您看到:
运行时错误:同步对象只能通过继承在进程之间共享
multiprocessing.Manager
,因为它没有此限制。经理的工作考虑到它可能完全在一个单独的进程中运行。
import ctypes
import multiprocessing
# Put this in a method or function, otherwise it will run on import from each module:
manager = multiprocessing.Manager()
counter = manager.Value(ctypes.c_ulonglong, 0)
counter_lock = manager.Lock() # pylint: disable=no-member
with counter_lock:
counter.value = count = counter.value + 1
或者,考虑禁用 GIL 的 Python 3.13+。它隐式地与线程共享内存。请参阅自由线程 CPython。但请注意,每个线程速度较慢。
如果数据是只读的,只需将其设置为池中分叉之前的模块中的变量即可。那么所有子进程都应该能够访问它,并且如果您不写入它,它就不会被复制。
import myglobals # anything (empty .py file)
myglobals.data = []
def count_it( key ):
count = 0
for c in myglobals.data:
if c == key:
count += 1
return count
if __name__ == '__main__':
myglobals.data = "abcabcs bsdfsdf gdfg dffdgdfg sdfsdfsd sdfdsfsdf"
pool = Pool()
print pool.map( count_it, ["a", "b", "s", "d"] )
如果您确实想尝试使用数组,您可以尝试使用
lock=False
关键字参数(默认情况下为 true)。
如果您需要显式传递它们,您可能必须使用 multiprocessing.Process。这是您修改后的示例:
from multiprocessing import Process, Array, Queue
def count_it( q, arr, key ):
count = 0
for c in arr:
if c == key:
count += 1
q.put((key, count))
if __name__ == '__main__':
testData = "abcabcs bsdfsdf gdfg dffdgdfg sdfsdfsd sdfdsfsdf"
# want to share it using shared memory
toShare = Array('c', testData)
q = Queue()
keys = ['a', 'b', 's', 'd']
workers = [Process(target=count_it, args = (q, toShare, key))
for key in keys]
for p in workers:
p.start()
for p in workers:
p.join()
while not q.empty():
print q.get(),
输出:('s', 9) ('a', 2) ('b', 3) ('d', 12)
队列元素的顺序可能会有所不同。
为了使其更加通用并且与 Pool 类似,您可以创建固定 N 个进程,将键列表拆分为 N 个部分,然后使用包装函数作为 Process 目标,该函数将为进程中的每个键调用 count_it列出已通过的内容,例如:
def wrapper( q, arr, keys ):
for k in keys:
count_it(q, arr, k)