我对 python 很陌生,遇到了这个错误。 代码 1:
import multiprocessing as mp
import os
def calc(num1, num2):
global addi
def addi(num1, num2):
print(num1+num2)
m = mp.Process(target = addi, args = (num1, num2))
m.start()
print("here is main", os.getpid())
m.join()
if __name__ == "__main__":
# creating processes
calc(5, 6)
ERROR 1 : ForkingPickler(file, protocol).dump(obj)
AttributeError: Can't pickle local object 'calc.<locals>.addi'
经过一番阅读后,我了解到 pickle 不能用于本地方法,因此我也尝试了下面的解决方案,但又出现了另一个错误。
代码2:
import multiprocessing as mp
import os
def calc(num1, num2):
**global addi**
def addi(num1, num2):
print(num1+num2)
m = mp.Process(target = addi, args = (num1, num2))
m.start()
print("here is main", os.getpid())
m.join()
if __name__ == "__main__":
# creating processes
calc(5, 6)
ERROR 2 :
self = reduction.pickle.load(from_parent)
AttributeError: Can't get attribute 'addi' on <module '__mp_main__' from '/Users
有人可以帮我解决这个问题吗?我不知道下一步该做什么! 我使用的python版本是python3.8.9
非常感谢!
基本上,您收到此错误的原因是因为多处理使用pickle,它通常只能序列化顶级模块级别的函数。函数
addi
不是顶级模块级别的函数。事实上,行 global addi
没有执行任何操作,因为 addi
从未在外部模块中声明过。所以你有三种方法来解决这个问题。
方法1
您可以在执行
addi
函数之前在全局范围内定义 calc
:
import multiprocessing as mp
import os
def addi(num1, num2):
print(num1 + num2)
def calc(num1, num2):
m = mp.Process(target=addi, args=(num1, num2))
m.start()
print("here is main", os.getpid())
m.join()
if __name__ == "__main__":
# creating processes
calc(5, 6)
输出
here is main 9924
11
方法2
您可以切换到multiprocess,它使用dill代替pickle,并且可以序列化此类函数。
import multiprocess as mp # Note that we are importing "multiprocess", no "ing"!
import os
def calc(num1, num2):
def addi(num1, num2):
print(num1 + num2)
m = mp.Process(target=addi, args=(num1, num2))
m.start()
print("here is main", os.getpid())
m.join()
if __name__ == "__main__":
# creating processes
calc(5, 6)
输出
here is main 67632
11
方法2b
虽然它是一个有用的库,但您可能不想使用
multiprocess
有一些正当的理由。一个重要的事实是标准库的 multiprocessing
和这个分支彼此不兼容(特别是如果您使用子包 multiprocessing.managers
中的任何内容)。这意味着,如果您在自己的项目中使用此分支,但也使用第三方库,而第三方库本身则使用标准库的 multiprocesing
,您可能会看到意外的行为。
无论如何,如果你想坚持使用标准库的
multiprocessing
而不使用fork,你可以自己使用dill
来序列化Python闭包,比如函数addi
,方法是子类化Process
类并添加一些我们自己的逻辑。下面给出一个例子:
import dill
from multiprocessing import Process # Use the standard library only
import os
class DillProcess(Process):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._target = dill.dumps(self._target) # Save the target function as bytes, using dill
def run(self):
if self._target:
self._target = dill.loads(self._target) # Unpickle the target function before executing
self._target(*self._args, **self._kwargs) # Execute the target function
def calc(num1, num2):
def addi(num1, num2):
print(num1 + num2)
m = DillProcess(target=addi, args=(num1, num2)) # Note how we use DillProcess, and not multiprocessing.Process
m.start()
print("here is main", os.getpid())
m.join()
if __name__ == "__main__":
# creating processes
calc(5, 6)
输出
here is main 23360
11
方法3
此方法适用于那些无法在代码中使用任何第三方库的人。我建议在使用这个方法之前确保上述方法不起作用,因为它有点老套,而且您确实需要重组一些代码。
无论如何,此方法通过引用顶级模块范围中的本地函数来工作,以便 pickle 可以访问它们。为了动态地执行此操作,我们创建一个占位符类并将所有本地函数添加为其类属性。我们还需要确保函数的
__qualname__
属性被更改为指向它们的新位置,并且这一切都是在 if __name__ ...
块之外运行时完成的(否则新启动的进程将看不到这些属性) 。考虑在这里对您的代码进行稍微修改的版本:
import multiprocessing as mp
import os
def calc(num1, num2):
def addi(num1, num2):
print(num1 + num2)
# Another local function you might have
def addi2():
print('hahahaha')
m = mp.Process(target=addi, args=(num1, num2))
m.start()
print("here is main", os.getpid())
m.join()
if __name__ == "__main__":
# creating processes
calc(5, 6)
以下是如何使用上述详细方法使其工作:
import multiprocessing as mp
import os
# This is our placeholder class, all local functions will be added as it's attributes
class _LocalFunctions:
@classmethod
def add_functions(cls, *args):
for function in args:
setattr(cls, function.__name__, function)
function.__qualname__ = cls.__qualname__ + '.' + function.__name__
def calc(num1, num2, _init=False):
# The _init parameter is to initialize all local functions outside __main__ block without actually running the
# whole function. Basically, you shift all local function definitions to the top and add them to our
# _LocalFunctions class. Now, if the _init parameter is True, then this means that the function call was just to
# initialize the local functions and you SHOULD NOT do anything else. This means that after they are initialized,
# you simply return (check below)
def addi(num1, num2):
print(num1 + num2)
# Another local function you might have
def addi2():
print('hahahaha')
# Add all functions to _LocalFunctions class, separating each with a comma:
_LocalFunctions.add_functions(addi, addi2)
# IMPORTANT: return and don't actually execute the logic of the function if _init is True!
if _init is True:
return
# Beyond here is where you put the function's actual logic including any assertions, etc.
m = mp.Process(target=addi, args=(num1, num2))
m.start()
print("here is main", os.getpid())
m.join()
# All factory functions must be initialized BEFORE the "if __name__ ..." clause. If they require any parameters,
# substitute with bogus ones and make sure to put the _init parameter value as True!
calc(0, 0, _init=True)
if __name__ == '__main__':
a = calc(5, 6)
因此,您需要在代码中更改一些内容,即内部的所有本地函数都在顶部定义,并且所有工厂函数都需要在外部初始化(为此它们需要接受
_init
参数) if __name__ ...
条款。但如果你不能使用莳萝,这可能是你能做的最好的事情。
main 中的
set_start_method('fork')我想我可以补充这个问题,我刚刚解决了非常相似的问题。有时创建全局函数是不可能的(或效率低下)。我认为这个例子最能解释我的意思。假设您有一个保存一些变量的函数 foo。这些不会改变。假设您想要执行某个接受多个参数的函数 baz。只有一个会。
在代码中它看起来像这样:
from multiprocessing import Pool
def foo(x,y,z):
# do what ever here
return x+y+z
def baz():
x = 5
y = 25
zs = [1,2,3,4,5]
unary = lambda z: foo(x,y,z)
with Pool() as pool:
results = pool.imap_unordered(unary, zs)
for result in results:
# whatever you do with result goes here
但这不起作用,因为一元是在本地定义的。相反,我们应该使用
partial
中的 functools
来做到这一点:
from multiprocessing import Pool
from functools import partial
def foo(x,y,z):
# do what ever here
return x+y+z
def baz():
x = 5
y = 25
zs = [1,2,3,4,5]
unary = partial(foo, x, y)
with Pool() as pool:
results = pool.imap_unordered(unary, zs)
for result in results:
# whatever you do with result goes here
这会起作用并解决问题。