在Python进程之间共享复杂对象?

问题描述 投票:34回答:6

我有一个相当复杂的Python对象,我需要在多个进程之间共享。我使用multiprocessing.Process启动这些流程。当我与multiprocessing.Queuemultiprocessing.Pipe共享一个对象时,它们分享得很好。但是当我尝试与其他非多处理模块对象共享一个对象时,似乎Python会分叉这些对象。真的吗?

我尝试使用multiprocessing.Value。但我不确定应该是什么类型的?我的对象类叫做MyClass。但是当我尝试multiprocess.Value(MyClass, instance)时,它失败了:

TypeError: this type has no size

知道发生了什么事吗?

python process multiprocessing sharing
6个回答
28
投票

您可以使用Python的Multiprocessing“Manager”类和您定义的代理类来完成此操作。来自Python文档:http://docs.python.org/library/multiprocessing.html#proxy-objects

您要做的是为自定义对象定义代理类,然后使用“远程管理器”共享对象 - 查看“远程管理器”的相同链接文档页面中的示例,其中文档显示如何共享一个远程队列。你将会做同样的事情,但是你对your_manager_instance.register()的调用将在你的参数列表中包含你的自定义代理类。

通过这种方式,您将设置一个服务器以与自定义代理共享自定义对象。您的客户端需要访问服务器(再次,请参阅如何设置客户端/服务器访问远程队列的优秀文档示例,但您不是共享队列,而是共享对特定类的访问权限)。


26
投票

经过大量的研究和测试,我发现“经理”在非复杂的对象层面上完成这项工作。

下面的代码显示对象inst在进程之间共享,这意味着当子进程更改时,var的属性inst会在外部更改。

from multiprocessing import Process, Manager
from multiprocessing.managers import BaseManager

class SimpleClass(object):
    def __init__(self):
        self.var = 0

    def set(self, value):
        self.var = value

    def get(self):
        return self.var


def change_obj_value(obj):
    obj.set(100)


if __name__ == '__main__':
    BaseManager.register('SimpleClass', SimpleClass)
    manager = BaseManager()
    manager.start()
    inst = manager.SimpleClass()

    p = Process(target=change_obj_value, args=[inst])
    p.start()
    p.join()

    print inst                    # <__main__.SimpleClass object at 0x10cf82350>
    print inst.get()              # 100

好的,如果你只需要共享简单的对象,上面的代码就足够了。

为什么没有复杂?因为如果对象是嵌套的(对象内部对象),它可能会失败:

from multiprocessing import Process, Manager
from multiprocessing.managers import BaseManager

class GetSetter(object):
    def __init__(self):
        self.var = None

    def set(self, value):
        self.var = value

    def get(self):
        return self.var


class ChildClass(GetSetter):
    pass

class ParentClass(GetSetter):
    def __init__(self):
        self.child = ChildClass()
        GetSetter.__init__(self)

    def getChild(self):
        return self.child


def change_obj_value(obj):
    obj.set(100)
    obj.getChild().set(100)


if __name__ == '__main__':
    BaseManager.register('ParentClass', ParentClass)
    manager = BaseManager()
    manager.start()
    inst2 = manager.ParentClass()

    p2 = Process(target=change_obj_value, args=[inst2])
    p2.start()
    p2.join()

    print inst2                    # <__main__.ParentClass object at 0x10cf82350>
    print inst2.getChild()         # <__main__.ChildClass object at 0x10cf6dc50>
    print inst2.get()              # 100
    #good!

    print inst2.getChild().get()   # None
    #bad! you need to register child class too but there's almost no way to do it
    #even if you did register child class, you may get PicklingError :)

我认为这种行为的主要原因是因为Manager只是在管道/队列等低级通信工具之上构建的直板。

因此,这种方法不适用于多处理案例。如果您可以使用锁定/信号量/管道/队列等低级工具或Redis队列或Redis发布/订阅等高级工具来处理复杂的用例(仅限我的推荐lol),那总是更好。


5
投票

这是我为此做的一个python包(在进程之间共享复杂的对象)。

git:https://github.com/dRoje/pipe-proxy

您的想法是为对象创建代理并将其传递给进程。然后使用代理,就像您对原始对象的引用一样。虽然您只能使用方法调用,但是访问对象变量会使setter和getter完成。

假设我们有一个名为'example'的对象,创建代理和代理监听器很容易:

from pipeproxy import proxy 
example = Example() 
exampleProxy, exampleProxyListener = proxy.createProxy(example) 

现在您将代理发送到另一个进程。

p = Process(target=someMethod, args=(exampleProxy,)) p.start()

像使用原始对象一样在其他进程中使用它(示例):

def someMethod(exampleProxy):
    ...
    exampleProxy.originalExampleMethod()
    ...

但你必须在主要过程中听取它:

exampleProxyListener.listen()

阅读更多内容并在此处查找示例

http://matkodjipalo.com/index.php/2017/11/12/proxy-solution-python-multiprocessing/


2
投票

我尝试使用BaseManager并注册我的自定义类以使其满意,并像Tom上面提到的那样得到嵌套类的问题。

我认为主要原因与所说的嵌套类无关,但是python采用低级别的通信机制。原因是python使用一些类似套接字的通信机制来同步在低级别的服务器进程内修改自定义类。我认为它封装了一些rpc方法,使它对用户透明,就像它们调用嵌套类对象的本地方法一样。

因此,当您想要修改,检索自定义对象或某些第三方对象时,您应该在流程中定义一些接口以与之通信,而不是直接获取或设置值。

然而,当在嵌套对象中操作多嵌套对象时,可以忽略上面提到的问题,就像在常规例程中所做的那样,因为已注册类中的嵌套对象不再是代理对象,操作在该对象上将不会再次通过类似套接字的通信例程并进行本地化。

这是我为解决问题而编写的可行代码。

from multiprocessing import Process, Manager, Lock
from multiprocessing.managers import BaseManager
import numpy as np

class NestedObj(object):
       def __init__(self):
                self.val = 1

class CustomObj(object):
        def __init__(self, numpy_obj):
                self.numpy_obj = numpy_obj
                self.nested_obj = NestedObj()

        def set_value(self, p, q, v):
                self.numpy_obj[p, q] = v

        def get_obj(self):
                return self.numpy_obj

        def get_nested_obj(self):
                return self.nested_obj.val

class CustomProcess(Process):
        def __init__(self, obj, p, q, v):
                super(CustomProcess, self).__init__()
                self.obj = obj
                self.index = p, q
                self.v = v

        def run(self):
                self.obj.set_value(*self.index, self.v)



if __name__=="__main__":
        BaseManager.register('CustomObj', CustomObj)
        manager = BaseManager()
        manager.start()
        data = [[0 for x in range(10)] for y in range(10)]
        matrix = np.matrix(data)
        custom_obj = manager.CustomObj(matrix)
        print(custom_obj.get_obj())
        process_list = []
        for p in range(10):
                for q in range(10):
                        proc = CustomProcess(custom_obj, p, q, 10*p+q)
                        process_list.append(proc)
        for x in range(100):
                process_list[x].start()
        for x in range(100):
                process_list[x].join()
        print(custom_obj.get_obj())
        print(custom_obj.get_nested_obj())

0
投票

为了节省一些带有共享资源的麻烦,您可以尝试收集需要访问由例如映射的函数的return语句中的单例资源的数据。 pool.imap_unordered然后在循环中进一步处理它,检索部分结果:

for result in in pool.imap_unordered(process_function, iterable_data):
    do_something(result)

如果返回的数据不多,那么执行此操作可能不会有太多开销。


0
投票

在Python 3.6中,文档说:

在版本3.6中更改:共享对象能够嵌套。例如,共享容器对象(如共享列表)可以包含其他共享对象,这些对象将由SyncManager进行管理和同步。

只要通过SyncManager创建实例,您就应该能够使对象相互引用。在另一种类型的对象的方法中动态创建一种类型的对象可能仍然是不可能的或非常棘手的。

编辑:我偶然发现了这个问题Multiprocessing managers and custom classes与python 3.6.5和3.6.7。需要检查python 3.7

编辑2:由于一些其他问题,我目前无法用python3.7测试它。 https://stackoverflow.com/a/50878600/7541006提供的解决方法对我来说很好

© www.soinside.com 2019 - 2024. All rights reserved.