如何停止通过请求和响应队列与主进程通信的子进程?

问题描述 投票:0回答:2

我有一个 Python 程序启动 N 子进程(客户端),它向主进程(服务器)发送请求并监听响应。进程间通信根据以下方案通过

multiprocessing.Queue
对象使用管道(每个消费者一个队列,因此一个请求队列和 N 响应队列):

                1 req_queue
                              <-- Process-1
MainProcess <-- ============= <-- …
                              <-- Process-N

                N resp_queues
            --> ============= --> Process-1
MainProcess --> ============= --> …
            --> ============= --> Process-N

(简化)程序:

import multiprocessing


def work(event, req_queue, resp_queue):
    while not event.is_set():
        name = multiprocessing.current_process().name
        x = 3
        req_queue.put((name, x))
        print(name, 'input:', x)
        y = resp_queue.get()
        print(name, 'output:', y)


if __name__ == '__main__':
    event = multiprocessing.Event()
    req_queue = multiprocessing.Queue()
    resp_queues = {}
    N = 10
    for _ in range(N):  # start N subprocesses
        resp_queue = multiprocessing.Queue()
        process = multiprocessing.Process(
            target=work, args=(event, req_queue, resp_queue))
        resp_queues[process.name] = resp_queue
        process.start()
    for _ in range(100):  # handle 100 requests
        (name, x) = req_queue.get()
        y = x ** 2
        resp_queues[name].put(y)
    event.set()

我面临的问题是这个程序(在Python 3.11.2下)的执行有时永远不会停止,一旦主进程通知子进程在

y = resp_queue.get()
行停止,就会在某些子进程中挂在
event.set()
行。如果我使用
threading
库而不是
multiprocessing
库,问题是一样的。

如何停止子进程?

python multiprocessing pipe client-server message-queue
2个回答
1
投票

queue.get()
是一个阻塞函数,到达它的线程(进程)会一直等到一个项目被放入队列,如果它已经到达
get()
线,它不会被设置事件唤醒。

通常的做法(即使在标准模块中)是在队列中发送一个

None
(或另一个无意义的对象)来唤醒在队列中等待的进程,并让它们在没有更多工作时终止。

event.set()
for queue_obj in resp_queues:
    queue_obj.put(None)

这使您的事件仅对提前终止有用,但如果不需要提前终止,您可以完全忽略工人的事件。

def work(event, req_queue, resp_queue):
    while True:
        ...
        y = resp_queue.get()
        print(name, 'output:', y)
        if y is None:
            break

很明显,如果主进程失败,仅使用

queue.get()
会导致资源泄漏,因此您应该做的另一个解决方案是在队列上使用超时,而不是让它永远等待。

y = resp_queue.get(timeout=0.1)

这确保进程最终会在“意外故障”时终止,但发送

None
是用于即时终止的。

如果您在整个代码中嵌入了多个

resp_queue.get()
,那么一个简单的
break on None
将不起作用,那么您可以在收到
sys.exit()
时使用
None
来终止工作人员,这将进行必要的清理,并且只能被捕获是一个裸
except:
,拦截
None
并调用
sys.exit
的代码可以隐藏在
multiprocessing.queues.Queue
的子类中。

class my_queue(multiprocessing.queues.Queue):
    def get(self, block: bool = False, timeout: Optional[float] = None) -> object:
        # set a default alternative timeout if you want
        return_value = super().get(block=block, timeout=timeout)  
        if return_value is None:  # or another dummy class as a signal
            sys.exit()  # or raise a known exception
        return return_value

1
投票

这是 @AhmedAEK's 的替代解决方案,它适用于

work
函数中开放数量的请求-响应对,即开放数量的
req_queue.put((name, x))
y = resp_queue.get()
调用对(在我的真正的程序我不控制工人代码,因为它可以由用户在子类中重新定义):

...

if __name__ == '__main__':
    ...
    event.set()
    try:
        while True:
            (name, x) = req_queue.get(timeout=1)
            resp_queues[name].put(None)
    except queue.Empty:
        pass
© www.soinside.com 2019 - 2024. All rights reserved.