我想并行递归地计算数组中所有元素的总和,我使用
multiprocessing
来实现我的想法。
这台电脑上只允许同时执行3个进程,相同颜色矩形内的操作在不同进程中同时执行。
这是我的代码(删除了不相关的部分):
import multiprocessing as mp
import random
def randList(len):
l = []
for _ in range(len):
l.append(random.randint(1, 100))
return l
def init_pool(sharedArr_):
global sharedArr
sharedArr = sharedArr_
class ParallelSum:
def solve(self, l, r):
if l >= r:
return sharedArr[l]
m = l + (r - l) // 2
sumL = self.pool.apply_async(self.solve, args=(l, m)) # compute left part of the array
sumR = self.pool.apply_async(self.solve, args=(m + 1, r)) # compute right part of the array
return sumL.get() + sumR.get()
def computeSum(self, a, l, r):
sharedArr = mp.Array("i", a)
self.pool = mp.Pool(initializer=init_pool, initargs=(sharedArr,))
result = self.pool.apply_async(self.solve, args=(l, r))
print(result.get()) # Error occurs here
self.pool.close()
self.pool.join()
if __name__ == "__main__":
a = randList(int(5000000)) # generating a list of random integers
parallel_sum = ParallelSum()
res = parallel_sum.computeSum(a, 0, len(a) - 1)
print(res)
我遇到了这个错误消息:“池对象无法在进程之间传递或腌制”
这是回溯:
File "...\parallel_sum.py", line 74, in <module>
res = parallel_sum.computeSum(a, 0, len(a) - 1)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "...\parallel_sum.py", line 50, in computeSum
print(result.get())
^^^^^^^^^^^^
File "...\miniconda3\Lib\multiprocessing\pool.py", line 774, in get
raise self._value
File "...\miniconda3\Lib\multiprocessing\pool.py", line 540, in _handle_tasks
put(task)
File "...\miniconda3\Lib\multiprocessing\connection.py", line 205, in send
self._send_bytes(_ForkingPickler.dumps(obj))
^^^^^^^^^^^^^^^^^^^^^^^^^^
File "...\miniconda3\Lib\multiprocessing\reduction.py", line 51, in dumps
cls(buf, protocol).dump(obj)
File "...\miniconda3\Lib\multiprocessing\pool.py", line 643, in __reduce__
raise NotImplementedError(
NotImplementedError: pool objects cannot be passed between processes or pickled
我想不可能递归地分叉一个新进程。
如何才能走上正轨?我对并行编程和这个 python 库还比较陌生。
错误是告诉你出了什么问题,这只是一些行话。为了序列化(“pickle”)
self.solve
以将其发送到工作进程,它必须序列化self
。要序列化 self
,它必须序列化 self
的属性,包括 self.pool
,这是一个无法序列化的 multiprocessing.Pool
(池中的工作线程无法获得池的所有权以便调度自己的任务)。
Python 一开始就不太支持递归代码(它无法优化尾递归以避免堆栈无限增长),并且试图使其与
multiprocessing.Pool
一起工作是非常难看的。不要这样做。