属性错误:无法在多处理中腌制本地对象

问题描述 投票:0回答:3

我对 python 很陌生,遇到了这个错误。 代码 1:

import multiprocessing as mp
import os
 
def calc(num1, num2):
    global addi
    def addi(num1, num2):
        print(num1+num2)
    m = mp.Process(target = addi, args = (num1, num2))
    m.start()

    print("here is main", os.getpid())
    m.join()
  
if __name__ == "__main__":
    # creating processes
   calc(5, 6)

ERROR 1 :    ForkingPickler(file, protocol).dump(obj)
AttributeError: Can't pickle local object 'calc.<locals>.addi'

经过一番阅读后,我了解到 pickle 不能用于本地方法,因此我也尝试了下面的解决方案,但又出现了另一个错误。

代码2:

import multiprocessing as mp
import os
   
def calc(num1, num2):
    **global addi**
    def addi(num1, num2):
        print(num1+num2)
    m = mp.Process(target = addi, args = (num1, num2))
    m.start()

    print("here is main", os.getpid())
    m.join()
  
if __name__ == "__main__":
    # creating processes
   calc(5, 6)
ERROR 2 :
 self = reduction.pickle.load(from_parent)
AttributeError: Can't get attribute 'addi' on <module '__mp_main__' from '/Users

有人可以帮我解决这个问题吗?我不知道下一步该做什么! 我使用的python版本是python3.8.9

非常感谢!

multiprocessing attributeerror python-3.8
3个回答
25
投票

基本上,您收到此错误的原因是因为多处理使用pickle,它通常只能序列化顶级模块级别的函数。函数

addi
不是顶级模块级别的函数。事实上,行
global addi
没有执行任何操作,因为
addi
从未在外部模块中声明过。所以你有三种方法来解决这个问题。

方法1

您可以在执行

addi
函数之前在全局范围内定义
calc

import multiprocessing as mp
import os


def addi(num1, num2):
    print(num1 + num2)

def calc(num1, num2):

    m = mp.Process(target=addi, args=(num1, num2))
    m.start()

    print("here is main", os.getpid())
    m.join()


if __name__ == "__main__":
    # creating processes
    calc(5, 6)

输出

here is main 9924
11

方法2

您可以切换到multiprocess,它使用dill代替pickle,并且可以序列化此类函数。

import multiprocess as mp  # Note that we are importing "multiprocess", no "ing"!
import os

def calc(num1, num2):

    def addi(num1, num2):
        print(num1 + num2)

    m = mp.Process(target=addi, args=(num1, num2))
    m.start()

    print("here is main", os.getpid())
    m.join()


if __name__ == "__main__":
    # creating processes
    calc(5, 6)

输出

here is main 67632
11

方法2b

虽然它是一个有用的库,但您可能不想使用

multiprocess
有一些正当的理由。一个重要的事实是标准库的
multiprocessing
和这个分支彼此不兼容(特别是如果您使用子包
multiprocessing.managers
中的任何内容)。这意味着,如果您在自己的项目中使用此分支,但也使用第三方库,而第三方库本身则使用标准库的
multiprocesing
,您可能会看到意外的行为。

无论如何,如果你想坚持使用标准库的

multiprocessing
而不使用fork,你可以自己使用
dill
来序列化Python闭包,比如函数
addi
,方法是子类化
Process
类并添加一些我们自己的逻辑。下面给出一个例子:

import dill
from multiprocessing import Process  # Use the standard library only
import os

class DillProcess(Process):

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self._target = dill.dumps(self._target)  # Save the target function as bytes, using dill

    def run(self):
        if self._target:
            self._target = dill.loads(self._target)    # Unpickle the target function before executing
            self._target(*self._args, **self._kwargs)  # Execute the target function


def calc(num1, num2):

    def addi(num1, num2):
        print(num1 + num2)

    m = DillProcess(target=addi, args=(num1, num2))  # Note how we use DillProcess, and not multiprocessing.Process
    m.start()

    print("here is main", os.getpid())
    m.join()


if __name__ == "__main__":
    # creating processes
    calc(5, 6)

输出

here is main 23360
11

方法3

此方法适用于那些无法在代码中使用任何第三方库的人。我建议在使用这个方法之前确保上述方法不起作用,因为它有点老套,而且您确实需要重组一些代码。

无论如何,此方法通过引用顶级模块范围中的本地函数来工作,以便 pickle 可以访问它们。为了动态地执行此操作,我们创建一个占位符类并将所有本地函数添加为其类属性。我们还需要确保函数的

__qualname__
属性被更改为指向它们的新位置,并且这一切都是在
if __name__ ...
块之外运行时完成的(否则新启动的进程将看不到这些属性) 。考虑在这里对您的代码进行稍微修改的版本:

import multiprocessing as mp
import os

def calc(num1, num2):

    def addi(num1, num2):
        print(num1 + num2)

    # Another local function you might have
    def addi2():
        print('hahahaha')

    m = mp.Process(target=addi, args=(num1, num2))
    m.start()

    print("here is main", os.getpid())
    m.join()


if __name__ == "__main__":
    # creating processes
    calc(5, 6)

以下是如何使用上述详细方法使其工作:

import multiprocessing as mp
import os


# This is our placeholder class, all local functions will be added as it's attributes
class _LocalFunctions:
    @classmethod
    def add_functions(cls, *args):
        for function in args:
            setattr(cls, function.__name__, function)
            function.__qualname__ = cls.__qualname__ + '.' + function.__name__


def calc(num1, num2, _init=False):
    # The _init parameter is to initialize all local functions outside __main__ block without actually running the 
    # whole function. Basically, you shift all local function definitions to the top and add them to our 
    # _LocalFunctions class. Now, if the _init parameter is True, then this means that the function call was just to 
    # initialize the local functions and you SHOULD NOT do anything else. This means that after they are initialized,
    # you simply return (check below)

    def addi(num1, num2):
        print(num1 + num2)

    # Another local function you might have
    def addi2():
        print('hahahaha')

    # Add all functions to _LocalFunctions class, separating each with a comma:
    _LocalFunctions.add_functions(addi, addi2)

    # IMPORTANT: return and don't actually execute the logic of the function if _init is True!
    if _init is True:
        return

    # Beyond here is where you put the function's actual logic including any assertions, etc.
    m = mp.Process(target=addi, args=(num1, num2))
    m.start()

    print("here is main", os.getpid())
    m.join()


# All factory functions must be initialized BEFORE the "if __name__ ..." clause. If they require any parameters,
# substitute with bogus ones and make sure to put the _init parameter value as True!
calc(0, 0, _init=True)

if __name__ == '__main__':
    a = calc(5, 6)

因此,您需要在代码中更改一些内容,即内部的所有本地函数都在顶部定义,并且所有工厂函数都需要在外部初始化(为此它们需要接受

_init
参数)
if __name__ ...
条款。但如果你不能使用莳萝,这可能是你能做的最好的事情。


0
投票

main 中的

set_start_method('fork')

0
投票

我想我可以补充这个问题,我刚刚解决了非常相似的问题。有时创建全局函数是不可能的(或效率低下)。我认为这个例子最能解释我的意思。假设您有一个保存一些变量的函数 foo。这些不会改变。假设您想要执行某个接受多个参数的函数 baz。只有一个会。

在代码中它看起来像这样:

from multiprocessing import Pool

def foo(x,y,z):
   # do what ever here 
   return x+y+z

def baz():
   x = 5
   y = 25
   zs = [1,2,3,4,5]
   unary = lambda z: foo(x,y,z)
   with Pool() as pool:
      results = pool.imap_unordered(unary, zs)
      for result in results:
         # whatever you do with result goes here

但这不起作用,因为一元是在本地定义的。相反,我们应该使用

partial
中的
functools
来做到这一点:

from multiprocessing import Pool
from functools import partial

def foo(x,y,z):
   # do what ever here 
   return x+y+z

def baz():
   x = 5
   y = 25
   zs = [1,2,3,4,5]
   unary = partial(foo, x, y)
   with Pool() as pool:
      results = pool.imap_unordered(unary, zs)
      for result in results:
         # whatever you do with result goes here

这会起作用并解决问题。

© www.soinside.com 2019 - 2024. All rights reserved.