我有一个很大的(只读)数据数组,我想由多个进程并行处理。
我喜欢Pool.map函数,并希望使用它来并行计算该数据上的函数。
我看到一个人可以使用Value或Array类在进程之间使用共享内存数据。但是当我尝试使用它时,我得到一个RuntimeError:'使用Pool.map函数时,应该仅通过继承在进程之间共享SynchronizedString对象:
这是我要执行的操作的简化示例:
from sys import stdin
from multiprocessing import Pool, Array
def count_it( arr, key ):
count = 0
for c in arr:
if c == key:
count += 1
return count
if __name__ == '__main__':
testData = "abcabcs bsdfsdf gdfg dffdgdfg sdfsdfsd sdfdsfsdf"
# want to share it using shared memory
toShare = Array('c', testData)
# this works
print count_it( toShare, "a" )
pool = Pool()
# RuntimeError here
print pool.map( count_it, [(toShare,key) for key in ["a", "b", "s", "d"]] )
谁能告诉我我在这里做错了吗?
因此,我想做的是将有关新创建的共享内存分配数组的信息传递给已在进程池中创建的进程。
[我刚刚看到赏金时再试一次;)
基本上,我认为错误消息的含义是它的意思-多处理共享内存数组不能作为参数传递(通过酸洗)。序列化数据没有意义-关键是数据是共享内存。因此,您必须使共享数组成为全局数组。我认为像我的第一个答案一样,将其作为模块的属性比较整洁,但在示例中将其保留为全局变量也可以很好地工作。考虑到您不想在fork之前设置数据的观点,这是一个修改后的示例。如果您想拥有多个共享数组(这就是为什么要将toShare作为参数传递的原因),您可以类似地创建共享数组的全局列表,然后将索引传递给count_it(它将变为for c in toShare[i]:
) 。
from sys import stdin
from multiprocessing import Pool, Array, Process
def count_it( key ):
count = 0
for c in toShare:
if c == key:
count += 1
return count
if __name__ == '__main__':
# allocate shared array - want lock=False in this case since we
# aren't writing to it and want to allow multiple processes to access
# at the same time - I think with lock=True there would be little or
# no speedup
maxLength = 50
toShare = Array('c', maxLength, lock=False)
# fork
pool = Pool()
# can set data after fork
testData = "abcabcs bsdfsdf gdfg dffdgdfg sdfsdfsd sdfdsfsdf"
if len(testData) > maxLength:
raise ValueError, "Shared array too small to hold data"
toShare[:len(testData)] = testData
print pool.map( count_it, ["a", "b", "s", "d"] )
[编辑:由于未使用fork,以上内容在Windows上不起作用。但是,下面的方法在Windows上仍然可以使用Pool,但仍然可以使用,因此我认为这与您想要的最接近:
from sys import stdin
from multiprocessing import Pool, Array, Process
import mymodule
def count_it( key ):
count = 0
for c in mymodule.toShare:
if c == key:
count += 1
return count
def initProcess(share):
mymodule.toShare = share
if __name__ == '__main__':
# allocate shared array - want lock=False in this case since we
# aren't writing to it and want to allow multiple processes to access
# at the same time - I think with lock=True there would be little or
# no speedup
maxLength = 50
toShare = Array('c', maxLength, lock=False)
# fork
pool = Pool(initializer=initProcess,initargs=(toShare,))
# can set data after fork
testData = "abcabcs bsdfsdf gdfg dffdgdfg sdfsdfsd sdfdsfsdf"
if len(testData) > maxLength:
raise ValueError, "Shared array too small to hold data"
toShare[:len(testData)] = testData
print pool.map( count_it, ["a", "b", "s", "d"] )
[不知道为什么map不会腌制数组,而Process和Pool会腌制-我认为也许是在Windows上的子进程初始化时转移了它。请注意,虽然在派生之后仍然设置了数据。
我看到的问题是Pool不支持通过其参数列表对共享数据进行酸洗。这就是错误消息的含义,即“对象只能通过继承在进程之间共享”。如果您要使用Pool类共享共享数据,则需要继承共享数据,即全局数据。
如果需要显式传递它们,则可能必须使用multiprocessing.Process。这是您重做的示例:
from multiprocessing import Process, Array, Queue
def count_it( q, arr, key ):
count = 0
for c in arr:
if c == key:
count += 1
q.put((key, count))
if __name__ == '__main__':
testData = "abcabcs bsdfsdf gdfg dffdgdfg sdfsdfsd sdfdsfsdf"
# want to share it using shared memory
toShare = Array('c', testData)
q = Queue()
keys = ['a', 'b', 's', 'd']
workers = [Process(target=count_it, args = (q, toShare, key))
for key in keys]
for p in workers:
p.start()
for p in workers:
p.join()
while not q.empty():
print q.get(),
输出:('s',9)('a',2)('b',3)('d',12)
队列元素的顺序可能有所不同。
要使其更通用并且类似于Pool,您可以创建固定的N个进程,将键列表拆分为N个片段,然后使用包装函数作为Process目标,该函数将为每个键调用count_it传递的列表,例如:
def wrapper( q, arr, keys ):
for k in keys:
count_it(q, arr, k)
如果数据是只读的,只需将其设置为模块中的变量[[之前来自Pool的派生。然后,所有子进程都应该能够访问它,并且只要您不对其进行写操作,就不会被复制。
import myglobals # anything (empty .py file)
myglobals.data = []
def count_it( key ):
count = 0
for c in myglobals.data:
if c == key:
count += 1
return count
if __name__ == '__main__':
myglobals.data = "abcabcs bsdfsdf gdfg dffdgdfg sdfsdfsd sdfdsfsdf"
pool = Pool()
print pool.map( count_it, ["a", "b", "s", "d"] )
尽管您可以尝试使用lock=False
关键字参数,但确实想使用Array(默认情况下为true)。>>
RuntimeError: Synchronized objects should only be shared between processes through inheritance
错误,请考虑使用multiprocessing.Manager
,因为它没有此限制。经理考虑到它可能完全在一个单独的过程中运行。import ctypes
import multiprocessing
manager = multiprocessing.Manager()
shared_counter = manager.Value(ctypes.c_ulonglong, 0)