python多重处理-进程在大队列的连接上挂起

问题描述 投票:26回答:4

我正在运行python 2.7.3,并且注意到以下奇怪行为。考虑这个最小的例子:

from multiprocessing import Process, Queue

def foo(qin, qout):
    while True:
        bar = qin.get()
        if bar is None:
            break
        qout.put({'bar': bar})

if __name__ == '__main__':
    import sys

    qin = Queue()
    qout = Queue()
    worker = Process(target=foo,args=(qin,qout))
    worker.start()

    for i in range(100000):
        print i
        sys.stdout.flush()
        qin.put(i**2)

    qin.put(None)
    worker.join()

当我循环超过10,000个或更多时,我的脚本挂在worker.join()上。当循环仅达到1,000时,它可以正常工作。

有什么想法吗?

python process queue multiprocessing
4个回答
35
投票

子进程中的qout队列已满。您从foo()中放入的数据不适合内部使用的OS管道的缓冲区,因此子进程将阻止尝试容纳更多数据。但是父进程没有读取该数据:它也被简单地阻塞,等待子进程完成。这是一个典型的死锁。


4
投票

队列大小必须有限制。考虑以下修改:

from multiprocessing import Process, Queue

def foo(qin,qout):
    while True:
        bar = qin.get()
        if bar is None:
            break
        #qout.put({'bar':bar})

if __name__=='__main__':
    import sys

    qin=Queue()
    qout=Queue()   ## POSITION 1
    for i in range(100):
        #qout=Queue()   ## POSITION 2
        worker=Process(target=foo,args=(qin,))
        worker.start()
        for j in range(1000):
            x=i*100+j
            print x
            sys.stdout.flush()
            qin.put(x**2)

        qin.put(None)
        worker.join()

    print 'Done!'

此功能保持原样(qout.put行已注释掉)。如果尝试保存所有100000个结果,则qout太大:如果我取消注释qout.put({'bar':bar})中的foo,并在位置1中保留qout的定义,则代码将挂起。但是,如果我将qout定义移至POSITION 2,则脚本结束。

简而言之,您必须注意qinqout都不会太大。 (另请参见:Multiprocessing Queue maxsize limit is 32767


3
投票

[当尝试将字符串放入总大小约为5000 cahrs的队列中时,在python3上存在相同的问题。

在我的项目中,有一个宿主进程设置一个队列并启动子进程,然后加入。其他join主机进程从队列读取。当子进程产生太多数据时,主机挂在join上。我使用以下函数修复了此问题,以等待主机进程中的子进程:

from multiprocessing import Process, Queue
from queue import Empty

def yield_from_process(q: Queue, p: Process):
    while p.is_alive():
        p.join(timeout=1)
        while True:
            try:
                yield q.get(block=False)
            except Empty:
                break

我在队列填满后立即从队列中读取,因此永远不会变得很大


0
投票

在池关闭后,我试图.get()一个异步工作者

with块之外的缩进错误

我有这个

with multiprocessing.Pool() as pool:
    async_results = list()
    for job in jobs:
        async_results.append(
            pool.apply_async(
                _worker_func,
                (job,),
            )
        )
# wrong
for async_result in async_results:
    yield async_result.get()

我需要这个

with multiprocessing.Pool() as pool:
    async_results = list()
    for job in jobs:
        async_results.append(
            pool.apply_async(
                _worker_func,
                (job,),
            )
        )
    # right
    for async_result in async_results:
        yield async_result.get()
© www.soinside.com 2019 - 2024. All rights reserved.