相关问题出现在为什么我不能将 multiprocessing.Queue 与 ProcessPoolExecutor 一起使用?。我提供了部分答案以及解决方法,但承认这个问题引发了另一个问题,即为什么
multiprocessing.Queue
实例 can 作为参数传递给 multiprocessing.Process
工作函数。
例如,以下代码在使用 spawn 或 fork 方法创建新进程的平台下会失败:
from multiprocessing import Pool, Queue
def worker(q):
print(q.get())
with Pool(1) as pool:
q = Queue()
q.put(7)
pool.apply(worker, args=(q,))
以上加薪:
RuntimeError: Queue objects should only be shared between processes through inheritance
但是以下程序运行没有问题:
from multiprocessing import Process, Queue
def worker(q):
print(q.get())
q = Queue()
q.put(7)
p = Process(target=worker, args=(q,))
p.start()
p.join()
看来,多处理池工作函数的参数最终会放在池的输入队列上,该队列是作为
multiprocessing.SimpleQueue
实现的,并且您不能将 multiprocessing.Queue
实例放入 multiprocessing.SimpleQueue
实例,该实例使用 ForkingPickler
用于序列化。
那么,当作为参数传递给
multiprocessing.Queue
时,multiprocessing.Process
是如何序列化的,从而允许以这种方式使用它?
我想扩展已接受的答案,所以我添加了自己的答案,其中还详细介绍了一种使队列、锁等可picklable并能够通过池发送的方法。
基本上,并不是队列不能序列化,只是
multiprocessing
仅在知道要发送到的目标进程(无论是当前进程还是其他进程)的足够信息时才可以序列化这些队列,即为什么它在您自己生成进程时有效(使用 Process
类),但当您只是将其放入队列中时却不起作用(例如使用 Pool
时)。
查看
multiprocessing.queues.Queue
的源代码(或其他连接对象,如 Condition
)。您会发现在他们的 __getstate__
方法(在腌制 Queue 实例时调用的方法)中,有一个对函数 multiprocessing.context.assert_spawning
的调用。仅当“当前线程”正在生成进程时,此“断言”才会通过。如果情况并非如此,multiprocessing
会引发您看到的错误并退出。现在,多处理甚至不费心去pickle队列以防断言失败的原因是它无法访问当线程创建子进程时创建的Popen
对象(对于Windows,您可以在
multiprocessing.popen_spawn_win32.Popen
找到它)
)。该对象存储有关目标进程的数据,包括其 pid 和进程句柄。多重处理需要此信息,因为队列包含互斥体,并且为了成功腌制并稍后再次重建它们,多重处理必须使用来自 DuplicateHandle
对象的信息通过 winapi 调用 Popen
。如果不存在此对象,多重处理将不知道该怎么做并引发错误。所以这就是我们的问题所在,但如果我们可以教多处理一种不同的方法来从目标进程本身内部窃取重复句柄,而不需要提前获取它的信息,那么这是可以解决的。制作可腌制的队列
multiprocessing.synchronize.SemLock
。它是所有多处理锁的基类,因此它的对象随后出现在队列、管道等中。当前腌制的方式就像我上面描述的那样,它需要目标进程的句柄来创建重复的句柄。但是,我们可以为
__reduce__
定义一个 SemLock
方法,在其中我们将使用当前进程的句柄创建一个重复句柄,然后从目标进程中复制之前创建的句柄,该句柄现在将在目标进程的上下文中有效。这是相当拗口的,但实际上也使用类似的方法来腌制 PipeConnection
对象,但它不是使用 __reduce__
方法,而是使用 调度表来执行此操作。 完成此操作后,我们可以子类
Queue
并删除对
assert_spawning
的调用,因为不再需要它。这样,我们现在就能够成功地pickle锁、队列、管道等。下面是带有示例的代码:import os, pickle
from multiprocessing import Pool, Lock, synchronize, get_context
import multiprocessing.queues
import _winapi
def work(q):
print("Worker: Main says", q.get())
q.put('haha')
class DupSemLockHandle(object):
"""
Picklable wrapper for a handle. Attempts to mirror how PipeConnection objects are pickled using appropriate api
"""
def __init__(self, handle, pid=None):
if pid is None:
# We just duplicate the handle in the current process and
# let the receiving process steal the handle.
pid = os.getpid()
proc = _winapi.OpenProcess(_winapi.PROCESS_DUP_HANDLE, False, pid)
try:
self._handle = _winapi.DuplicateHandle(
_winapi.GetCurrentProcess(),
handle, proc, 0, False, _winapi.DUPLICATE_SAME_ACCESS)
finally:
_winapi.CloseHandle(proc)
self._pid = pid
def detach(self):
"""
Get the handle, typically from another process
"""
# retrieve handle from process which currently owns it
if self._pid == os.getpid():
# The handle has already been duplicated for this process.
return self._handle
# We must steal the handle from the process whose pid is self._pid.
proc = _winapi.OpenProcess(_winapi.PROCESS_DUP_HANDLE, False,
self._pid)
try:
return _winapi.DuplicateHandle(
proc, self._handle, _winapi.GetCurrentProcess(),
0, False, _winapi.DUPLICATE_CLOSE_SOURCE | _winapi.DUPLICATE_SAME_ACCESS)
finally:
_winapi.CloseHandle(proc)
def reduce_lock_connection(self):
sl = self._semlock
dh = DupSemLockHandle(sl.handle)
return rebuild_lock_connection, (dh, type(self), (sl.kind, sl.maxvalue, sl.name))
def rebuild_lock_connection(dh, t, state):
handle = dh.detach() # Duplicated handle valid in current process's context
# Create a new instance without calling __init__ because we'll supply the state ourselves
lck = t.__new__(t)
lck.__setstate__((handle,)+state)
return lck
# Add our own reduce function to pickle SemLock and it's child classes
synchronize.SemLock.__reduce__ = reduce_lock_connection
class PicklableQueue(multiprocessing.queues.Queue):
"""
A picklable Queue that skips the call to context.assert_spawning because it's no longer needed
"""
def __init__(self, *args, **kwargs):
ctx = get_context()
super().__init__(*args, **kwargs, ctx=ctx)
def __getstate__(self):
return (self._ignore_epipe, self._maxsize, self._reader, self._writer,
self._rlock, self._wlock, self._sem, self._opid)
def is_locked(l):
"""
Returns whether the given lock is acquired or not.
"""
locked = l.acquire(block=False)
if locked is False:
return True
else:
l.release()
return False
if __name__ == '__main__':
# Example that shows that you can now pickle/unpickle locks and they'll still point towards the same object
l1 = Lock()
p = pickle.dumps(l1)
l2 = pickle.loads(p)
print('before acquiring, l1 locked:', is_locked(l1), 'l2 locked', is_locked(l2))
l2.acquire()
print('after acquiring l1 locked:', is_locked(l1), 'l2 locked', is_locked(l2))
# Example that shows how you can pass a queue to Pool and it will work
with Pool() as pool:
q = PicklableQueue()
q.put('laugh')
pool.map(work, (q,))
print("Main: Worker says", q.get())
输出
before acquiring, l1 locked: False l2 locked False
after acquiring l1 locked: True l2 locked True
Worker: Main says laugh
Main: Worker says haha
:以上代码仅适用于Windows。如果您使用的是 UNIX,那么您可以尝试使用下面的 @Booboo's 修改后的代码(报告有效,但尚未经过充分测试,完整代码链接此处):
import os, pickle
from multiprocessing import Pool, Lock, synchronize, get_context, Process
import multiprocessing.queues
import sys
_is_windows= sys.platform == 'win32'
if _is_windows:
import _winapi
.
.
.
class DupSemLockHandle(object):
"""
Picklable wrapper for a handle. Attempts to mirror how PipeConnection objects are pickled using appropriate api
"""
def __init__(self, handle, pid=None):
if pid is None:
# We just duplicate the handle in the current process and
# let the receiving process steal the handle.
pid = os.getpid()
if _is_windows:
proc = _winapi.OpenProcess(_winapi.PROCESS_DUP_HANDLE, False, pid)
try:
self._handle = _winapi.DuplicateHandle(
_winapi.GetCurrentProcess(),
handle, proc, 0, False, _winapi.DUPLICATE_SAME_ACCESS)
finally:
_winapi.CloseHandle(proc)
else:
self._handle = handle
self._pid = pid
def detach(self):
"""
Get the handle, typically from another process
"""
# retrieve handle from process which currently owns it
if self._pid == os.getpid():
# The handle has already been duplicated for this process.
return self._handle
if not _is_windows:
return self._handle
# We must steal the handle from the process whose pid is self._pid.
proc = _winapi.OpenProcess(_winapi.PROCESS_DUP_HANDLE, False,
self._pid)
try:
return _winapi.DuplicateHandle(
proc, self._handle, _winapi.GetCurrentProcess(),
0, False, _winapi.DUPLICATE_CLOSE_SOURCE | _winapi.DUPLICATE_SAME_ACCESS)
finally:
_winapi.CloseHandle(proc)
multiprocessing.Qeue
序列化为
multiprocessing.Process.run
方法时,被序列化的不是队列本身。队列由一个管道(由文件描述符表示)和一个序列化对管道的访问的锁来实现。文件描述符的复制仅通过在生成子进程时继承描述符或通过需要了解两个进程的特定于平台的调用来发生。然后可以根据文件描述符重建原始队列。