不能pickle静态方法 - 多处理 - Python

问题描述 投票:8回答:3

我正在对我的代码应用一些并行化,我在其中使用类。我知道如果不采用Python提供的任何其他方法,就不可能选择类方法。我找到了解决方案here。在我的代码中,我必须使用类来并行化部分。在这里,我发布了一个非常简单的代码,只是代表我的结构(是相同的,但我删除了方法内容,这是很多数学计算,对于我得到的输出而言是微不足道的)。问题是'因为我可以腌制一种方法(shepard_interpolation),但是另一种方法(calculate_orientation_uncertainty)我得到了泡菜错误。我不知道为什么会这样,或者为什么它会起作用。

def _pickle_method(method):
    func_name = method.im_func.__name__
    obj = method.im_self
    cls = method.im_class
    if func_name.startswith('__') and not func_name.endswith('__'): #deal with mangled names
        cls_name = cls.__name__.lstrip('_')
        func_name = '_' + cls_name + func_name
    print cls
    return _unpickle_method, (func_name, obj, cls)


def _unpickle_method(func_name, obj, cls):
    for cls in cls.__mro__:
        try:
            func = cls.__dict__[func_name]
        except KeyError:
            pass
        else:
            break
    return func.__get__(obj, cls)

class ImageData(object):

    def __init__(self, width=60, height=60):
        self.width = width
        self.height = height
        self.data = []
        for i in range(width):
            self.data.append([0] * height)

    def shepard_interpolation(self, seeds=20):
        print "ImD - Sucess"       

import copy_reg
import types
from itertools import product
from multiprocessing import Pool

copy_reg.pickle(types.MethodType, _pickle_method, _unpickle_method)

class VariabilityOfGradients(object):
    def __init__(self):
        pass

    @staticmethod
    def aux():
        return "VoG - Sucess" 

    @staticmethod
    def calculate_orientation_uncertainty():
        results = []
        pool = Pool()
        for x, y in product(range(1, 5), range(1, 5)):
            result = pool.apply_async(VariabilityOfGradients.aux) 
        results.append(result.get())
        pool.close()
        pool.join()        


if __name__ == '__main__':  
    results = []
    pool = Pool()
    for _ in range(3):
        result = pool.apply_async(ImageData.shepard_interpolation, args=[ImageData()])
        results.append(result.get())
    pool.close()
    pool.join()

    VariabilityOfGradients.calculate_orientation_uncertainty()   

在运行时,我得到了“PicklingError:无法pickle:属性查找builtin.function失败”。而这几乎与here相同。我看到的唯一区别是我的方法是静态的。

编辑:

我注意到在我的calculate_orientation_uncertainty中,当我将函数称为result = pool.apply_async(VariabilityOfGradients.aux())时,即使用括号(在doc示例中我从未见过这个),它似乎有效。但是,当我试图得到结果时,我收到“TypeError:'int'对象不可调用”...

任何帮助,将不胜感激。先感谢您。

python class multiprocessing pickle pool
3个回答
9
投票

您可以在模块级别定义普通函数,也可以定义静态方法。这保留了static方法的调用语法,内省和可继承性功能,同时避免了酸洗问题:

def aux():
    return "VoG - Sucess" 

class VariabilityOfGradients(object):
    aux = staticmethod(aux)

例如,

import copy_reg
import types
from itertools import product
import multiprocessing as mp

def _pickle_method(method):
    """
    Author: Steven Bethard (author of argparse)
    http://bytes.com/topic/python/answers/552476-why-cant-you-pickle-instancemethods
    """
    func_name = method.im_func.__name__
    obj = method.im_self
    cls = method.im_class
    cls_name = ''
    if func_name.startswith('__') and not func_name.endswith('__'):
        cls_name = cls.__name__.lstrip('_')
    if cls_name:
        func_name = '_' + cls_name + func_name
    return _unpickle_method, (func_name, obj, cls)


def _unpickle_method(func_name, obj, cls):
    """
    Author: Steven Bethard
    http://bytes.com/topic/python/answers/552476-why-cant-you-pickle-instancemethods
    """
    for cls in cls.mro():
        try:
            func = cls.__dict__[func_name]
        except KeyError:
            pass
        else:
            break
    return func.__get__(obj, cls)

copy_reg.pickle(types.MethodType, _pickle_method, _unpickle_method)

class ImageData(object):

    def __init__(self, width=60, height=60):
        self.width = width
        self.height = height
        self.data = []
        for i in range(width):
            self.data.append([0] * height)

    def shepard_interpolation(self, seeds=20):
        print "ImD - Success"       

def aux():
    return "VoG - Sucess" 

class VariabilityOfGradients(object):
    aux = staticmethod(aux)

    @staticmethod
    def calculate_orientation_uncertainty():
        pool = mp.Pool()
        results = []
        for x, y in product(range(1, 5), range(1, 5)):
            # result = pool.apply_async(aux) # this works too
            result = pool.apply_async(VariabilityOfGradients.aux, callback=results.append)
        pool.close()
        pool.join()
        print(results)


if __name__ == '__main__':  
    results = []
    pool = mp.Pool()
    for _ in range(3):
        result = pool.apply_async(ImageData.shepard_interpolation, args=[ImageData()])
        results.append(result.get())
    pool.close()
    pool.join()

    VariabilityOfGradients.calculate_orientation_uncertainty()   

产量

ImD - Success
ImD - Success
ImD - Success
['VoG - Sucess', 'VoG - Sucess', 'VoG - Sucess', 'VoG - Sucess', 'VoG - Sucess', 'VoG - Sucess', 'VoG - Sucess', 'VoG - Sucess', 'VoG - Sucess', 'VoG - Sucess', 'VoG - Sucess', 'VoG - Sucess', 'VoG - Sucess', 'VoG - Sucess', 'VoG - Sucess', 'VoG - Sucess']

顺便说一句,result.get()阻止调用过程,直到pool.apply_async调用的函数(例如ImageData.shepard_interpolation)完成。所以

for _ in range(3):
    result = pool.apply_async(ImageData.shepard_interpolation, args=[ImageData()])
    results.append(result.get())

实际上是按顺序调用ImageData.shepard_interpolation,打败了游泳池的目的。

相反,你可以使用

for _ in range(3):
    pool.apply_async(ImageData.shepard_interpolation, args=[ImageData()],
                     callback=results.append)

当函数完成时,在调用进程的线程中调用回调函数(例如results.append)。它被发送一个参数 - 函数的返回值。因此,没有什么可以阻止三个pool.apply_async调用快速进行,并且三个调用ImageData.shepard_interpolation所做的工作将同时执行。

或者,在这里使用pool.map可能更简单。

results = pool.map(ImageData.shepard_interpolation, [ImageData()]*3)

5
投票

如果你使用multiprocessingpathos.multiprocesssing分支,你可以在多处理的map函数中直接使用类和类方法。这是因为使用dill而不是picklecPickle,而dill可以在python中序列化几乎任何东西。

pathos.multiprocessing还提供异步映射函数...它可以使用多个参数的map函数(例如map(math.pow, [1,2,3], [4,5,6])

见:What can multiprocessing and dill do together?

并且:http://matthewrocklin.com/blog/work/2013/12/05/Parallelism-and-Serialization/

>>> from pathos.multiprocessing import ProcessingPool as Pool
>>> 
>>> p = Pool(4)
>>> 
>>> def add(x,y):
...   return x+y
... 
>>> x = [0,1,2,3]
>>> y = [4,5,6,7]
>>> 
>>> p.map(add, x, y)
[4, 6, 8, 10]
>>> 
>>> class Test(object):
...   def plus(self, x, y): 
...     return x+y
... 
>>> t = Test()
>>> 
>>> p.map(Test.plus, [t]*4, x, y)
[4, 6, 8, 10]
>>> 
>>> p.map(t.plus, x, y)
[4, 6, 8, 10]

获取代码:https://github.com/uqfoundation/pathos

pathos也有一个异步地图(amap),以及imap


0
投票

我不确定这是不是你要找的,但我的用法略有不同。我想使用在多个线程上运行的同一个类中的类的方法。

这是我实现它的方式:

from multiprocessing import Pool

class Product(object):

        def __init__(self):
                self.logger = "test"

        def f(self, x):
                print(self.logger)
                return x*x

        def multi(self):
                p = Pool(5)
                print(p.starmap(Product.f, [(Product(), 1), (Product(), 2), (Product(), 3)]))


if __name__ == '__main__':
        obj = Product()
        obj.multi()
© www.soinside.com 2019 - 2024. All rights reserved.