我有一个 Python 程序启动 N 子进程(客户端),它向主进程(服务器)发送请求并监听响应。进程间通信根据以下方案通过
multiprocessing.Queue
对象使用管道(每个消费者一个队列,因此一个请求队列和 N 响应队列):
1 req_queue
<-- Process-1
MainProcess <-- ============= <-- …
<-- Process-N
N resp_queues
--> ============= --> Process-1
MainProcess --> ============= --> …
--> ============= --> Process-N
(简化)程序:
import multiprocessing
def work(event, req_queue, resp_queue):
while not event.is_set():
name = multiprocessing.current_process().name
x = 3
req_queue.put((name, x))
print(name, 'input:', x)
y = resp_queue.get()
print(name, 'output:', y)
if __name__ == '__main__':
event = multiprocessing.Event()
req_queue = multiprocessing.Queue()
resp_queues = {}
N = 10
for _ in range(N): # start N subprocesses
resp_queue = multiprocessing.Queue()
process = multiprocessing.Process(
target=work, args=(event, req_queue, resp_queue))
resp_queues[process.name] = resp_queue
process.start()
for _ in range(100): # handle 100 requests
(name, x) = req_queue.get()
y = x ** 2
resp_queues[name].put(y)
event.set()
我面临的问题是这个程序(在Python 3.11.2下)的执行有时永远不会停止,一旦主进程通知子进程在
y = resp_queue.get()
行停止,就会在某些子进程中挂在event.set()
行。如果我使用 threading
库而不是 multiprocessing
库,问题是一样的。
如何停止子进程?
queue.get()
是一个阻塞函数,到达它的线程(进程)会一直等到一个项目被放入队列,如果它已经到达get()
线,它不会被设置事件唤醒。
通常的做法(即使在标准模块中)是在队列中发送一个
None
(或另一个无意义的对象)来唤醒在队列中等待的进程,并让它们在没有更多工作时终止。
event.set()
for queue_obj in resp_queues:
queue_obj.put(None)
这使您的事件仅对提前终止有用,但如果不需要提前终止,您可以完全忽略工人的事件。
def work(event, req_queue, resp_queue):
while True:
...
y = resp_queue.get()
print(name, 'output:', y)
if y is None:
break
很明显,如果主进程失败,仅使用
queue.get()
会导致资源泄漏,因此您应该做的另一个解决方案是在队列上使用超时,而不是让它永远等待。
y = resp_queue.get(timeout=0.1)
这确保进程最终会在“意外故障”时终止,但发送
None
是用于即时终止的。
如果您在整个代码中嵌入了多个
resp_queue.get()
,那么一个简单的break on None
将不起作用,那么您可以在收到sys.exit()
时使用None
来终止工作人员,这将进行必要的清理,并且只能被捕获是一个裸except:
,拦截None
并调用sys.exit
的代码可以隐藏在multiprocessing.queues.Queue
的子类中。
class my_queue(multiprocessing.queues.Queue):
def get(self, block: bool = False, timeout: Optional[float] = None) -> object:
# set a default alternative timeout if you want
return_value = super().get(block=block, timeout=timeout)
if return_value is None: # or another dummy class as a signal
sys.exit() # or raise a known exception
return return_value
这是 @AhmedAEK's 的替代解决方案,它适用于
work
函数中开放数量的请求-响应对,即开放数量的 req_queue.put((name, x))
和 y = resp_queue.get()
调用对(在我的真正的程序我不控制工人代码,因为它可以由用户在子类中重新定义):
...
if __name__ == '__main__':
...
event.set()
try:
while True:
(name, x) = req_queue.get(timeout=1)
resp_queues[name].put(None)
except queue.Empty:
pass