当将 multiprocessing.Queue 实例作为参数传递给 multiprocessing.Process 时,它是如何序列化的?

问题描述 投票:0回答:2

相关问题出现在为什么我不能将 multiprocessing.Queue 与 ProcessPoolExecutor 一起使用?。我提供了部分答案以及解决方法,但承认这个问题引发了另一个问题,即为什么

multiprocessing.Queue
实例 can 作为参数传递给
multiprocessing.Process
工作函数。

例如,以下代码在使用 spawnfork 方法创建新进程的平台下会失败:

from multiprocessing import Pool, Queue

def worker(q):
    print(q.get())

with Pool(1) as pool:
    q = Queue()
    q.put(7)
    pool.apply(worker, args=(q,))

以上加薪:

RuntimeError: Queue objects should only be shared between processes through inheritance

但是以下程序运行没有问题:

from multiprocessing import Process, Queue

def worker(q):
    print(q.get())

q = Queue()
q.put(7)
p = Process(target=worker, args=(q,))
p.start()
p.join()

看来,多处理池工作函数的参数最终会放在池的输入队列上,该队列是作为

multiprocessing.SimpleQueue
实现的,并且您不能将
multiprocessing.Queue
实例放入
multiprocessing.SimpleQueue
实例,该实例使用
ForkingPickler 
用于序列化。

那么,当作为参数传递给

multiprocessing.Queue
时,
multiprocessing.Process
是如何序列化的,从而允许以这种方式使用它?

python multiprocessing queue
2个回答
6
投票

我想扩展已接受的答案,所以我添加了自己的答案,其中还详细介绍了一种使队列、锁等可picklable并能够通过池发送的方法。

为什么会出现这种情况

基本上,并不是队列不能序列化,只是

multiprocessing
仅在知道要发送到的目标进程(无论是当前进程还是其他进程)的足够信息时才可以序列化这些队列,即为什么它在您自己生成进程时有效(使用
Process
类),但当您只是将其放入队列中时却不起作用(例如使用
Pool
时)。

查看

multiprocessing.queues.Queue
的源代码(或其他连接对象,如
Condition
)。您会发现在他们的
__getstate__
方法(在腌制 Queue 实例时调用的方法)中,有一个对函数
multiprocessing.context.assert_spawning
的调用。仅当“当前线程”正在生成进程时,此“断言”才会通过。如果情况并非如此,multiprocessing 会引发您看到的错误并退出。
现在,多处理甚至不费心去pickle队列以防断言失败的原因是它无法访问当线程创建子进程时创建的

Popen

对象(对于Windows,您可以在

multiprocessing.popen_spawn_win32.Popen找到它) 
)。该对象存储有关目标进程的数据,包括其 pid 和进程句柄。多重处理需要此信息,因为队列包含互斥体,并且为了成功腌制并稍后再次重建它们,多重处理必须使用来自
DuplicateHandle
对象的信息通过 winapi 调用
Popen
。如果不存在此对象,多重处理将不知道该怎么做并引发错误。所以这就是我们的问题所在,但如果我们可以教多处理一种不同的方法来从目标进程本身内部窃取重复句柄,而不需要提前获取它的信息,那么这是可以解决的。
制作可腌制的队列

上课专心听讲

multiprocessing.synchronize.SemLock

。它是所有多处理锁的基类,因此它的对象随后出现在队列、管道等中。当前腌制的方式就像我上面描述的那样,它需要目标进程的句柄来创建重复的句柄。但是,我们可以为

__reduce__
定义一个
SemLock
方法,在其中我们将使用当前进程的句柄创建一个重复句柄,然后从目标进程中复制之前创建的句柄,该句柄现在将在目标进程的上下文中有效。这是相当拗口的,但实际上也使用类似的方法来腌制
PipeConnection
对象,但它不是使用
__reduce__
方法,而是使用
调度表
来执行此操作。 完成此操作后,我们可以子类

Queue

并删除对

assert_spawning
的调用,因为不再需要它。这样,我们现在就能够成功地pickle锁、队列、管道等。下面是带有示例的代码:
import os, pickle
from multiprocessing import Pool, Lock, synchronize, get_context
import multiprocessing.queues
import _winapi


def work(q):
    print("Worker: Main says", q.get())
    q.put('haha')


class DupSemLockHandle(object):
    """
    Picklable wrapper for a handle. Attempts to mirror how PipeConnection objects are pickled using appropriate api
    """

    def __init__(self, handle, pid=None):
        if pid is None:
            # We just duplicate the handle in the current process and
            # let the receiving process steal the handle.
            pid = os.getpid()
        proc = _winapi.OpenProcess(_winapi.PROCESS_DUP_HANDLE, False, pid)
        try:
            self._handle = _winapi.DuplicateHandle(
                _winapi.GetCurrentProcess(),
                handle, proc, 0, False, _winapi.DUPLICATE_SAME_ACCESS)
        finally:
            _winapi.CloseHandle(proc)
        self._pid = pid

    def detach(self):
        """
        Get the handle, typically from another process
        """
        # retrieve handle from process which currently owns it
        if self._pid == os.getpid():
            # The handle has already been duplicated for this process.
            return self._handle
        # We must steal the handle from the process whose pid is self._pid.
        proc = _winapi.OpenProcess(_winapi.PROCESS_DUP_HANDLE, False,
                                   self._pid)
        try:
            return _winapi.DuplicateHandle(
                proc, self._handle, _winapi.GetCurrentProcess(),
                0, False, _winapi.DUPLICATE_CLOSE_SOURCE | _winapi.DUPLICATE_SAME_ACCESS)
        finally:
            _winapi.CloseHandle(proc)


def reduce_lock_connection(self):
    sl = self._semlock
    dh = DupSemLockHandle(sl.handle)
    return rebuild_lock_connection, (dh, type(self), (sl.kind, sl.maxvalue, sl.name))


def rebuild_lock_connection(dh, t, state):
    handle = dh.detach()  # Duplicated handle valid in current process's context

    # Create a new instance without calling __init__ because we'll supply the state ourselves
    lck = t.__new__(t)
    lck.__setstate__((handle,)+state)
    return lck


# Add our own reduce function to pickle SemLock and it's child classes
synchronize.SemLock.__reduce__ = reduce_lock_connection


class PicklableQueue(multiprocessing.queues.Queue):
    """
    A picklable Queue that skips the call to context.assert_spawning because it's no longer needed
    """

    def __init__(self, *args, **kwargs):
        ctx = get_context()
        super().__init__(*args, **kwargs, ctx=ctx)

    def __getstate__(self):

        return (self._ignore_epipe, self._maxsize, self._reader, self._writer,
                self._rlock, self._wlock, self._sem, self._opid)


def is_locked(l):
    """
    Returns whether the given lock is acquired or not.
    """
    locked = l.acquire(block=False)
    if locked is False:
        return True
    else:
        l.release()
        return False


if __name__ == '__main__':

    # Example that shows that you can now pickle/unpickle locks and they'll still point towards the same object
    l1 = Lock()
    p = pickle.dumps(l1)
    l2 = pickle.loads(p)
    print('before acquiring, l1 locked:', is_locked(l1), 'l2 locked', is_locked(l2))
    l2.acquire()
    print('after acquiring l1 locked:', is_locked(l1), 'l2 locked', is_locked(l2))

    # Example that shows how you can pass a queue to Pool and it will work
    with Pool() as pool:

        q = PicklableQueue()
        q.put('laugh')
        pool.map(work, (q,))
        print("Main: Worker says", q.get())

输出

before acquiring, l1 locked: False l2 locked False after acquiring l1 locked: True l2 locked True Worker: Main says laugh Main: Worker says haha

免责声明

:以上代码仅适用于Windows。如果您使用的是 UNIX,那么您可以尝试使用下面的 @Booboo's 修改后的代码(报告有效,但尚未经过充分测试,完整代码链接此处): import os, pickle from multiprocessing import Pool, Lock, synchronize, get_context, Process import multiprocessing.queues import sys _is_windows= sys.platform == 'win32' if _is_windows: import _winapi . . . class DupSemLockHandle(object): """ Picklable wrapper for a handle. Attempts to mirror how PipeConnection objects are pickled using appropriate api """ def __init__(self, handle, pid=None): if pid is None: # We just duplicate the handle in the current process and # let the receiving process steal the handle. pid = os.getpid() if _is_windows: proc = _winapi.OpenProcess(_winapi.PROCESS_DUP_HANDLE, False, pid) try: self._handle = _winapi.DuplicateHandle( _winapi.GetCurrentProcess(), handle, proc, 0, False, _winapi.DUPLICATE_SAME_ACCESS) finally: _winapi.CloseHandle(proc) else: self._handle = handle self._pid = pid def detach(self): """ Get the handle, typically from another process """ # retrieve handle from process which currently owns it if self._pid == os.getpid(): # The handle has already been duplicated for this process. return self._handle if not _is_windows: return self._handle # We must steal the handle from the process whose pid is self._pid. proc = _winapi.OpenProcess(_winapi.PROCESS_DUP_HANDLE, False, self._pid) try: return _winapi.DuplicateHandle( proc, self._handle, _winapi.GetCurrentProcess(), 0, False, _winapi.DUPLICATE_CLOSE_SOURCE | _winapi.DUPLICATE_SAME_ACCESS) finally: _winapi.CloseHandle(proc)



3
投票
multiprocessing.Qeue

序列化为

multiprocessing.Process.run
方法时,被序列化的不是队列本身。队列由一个管道(由文件描述符表示)和一个序列化对管道的访问的锁来实现。
文件描述符的复制仅通过在生成子进程时继承描述符或通过需要了解两个进程的特定于平台的调用来发生。然后可以根据文件描述符重建原始队列。

© www.soinside.com 2019 - 2024. All rights reserved.