我正在运行python 2.7.3,并且注意到以下奇怪行为。考虑这个最小的例子:
from multiprocessing import Process, Queue
def foo(qin, qout):
while True:
bar = qin.get()
if bar is None:
break
qout.put({'bar': bar})
if __name__ == '__main__':
import sys
qin = Queue()
qout = Queue()
worker = Process(target=foo,args=(qin,qout))
worker.start()
for i in range(100000):
print i
sys.stdout.flush()
qin.put(i**2)
qin.put(None)
worker.join()
当我循环超过10,000个或更多时,我的脚本挂在worker.join()
上。当循环仅达到1,000时,它可以正常工作。
有什么想法吗?
子进程中的qout
队列已满。您从foo()
中放入的数据不适合内部使用的OS管道的缓冲区,因此子进程将阻止尝试容纳更多数据。但是父进程没有读取该数据:它也被简单地阻塞,等待子进程完成。这是一个典型的死锁。
队列大小必须有限制。考虑以下修改:
from multiprocessing import Process, Queue
def foo(qin,qout):
while True:
bar = qin.get()
if bar is None:
break
#qout.put({'bar':bar})
if __name__=='__main__':
import sys
qin=Queue()
qout=Queue() ## POSITION 1
for i in range(100):
#qout=Queue() ## POSITION 2
worker=Process(target=foo,args=(qin,))
worker.start()
for j in range(1000):
x=i*100+j
print x
sys.stdout.flush()
qin.put(x**2)
qin.put(None)
worker.join()
print 'Done!'
此功能保持原样(qout.put
行已注释掉)。如果尝试保存所有100000个结果,则qout
太大:如果我取消注释qout.put({'bar':bar})
中的foo
,并在位置1中保留qout
的定义,则代码将挂起。但是,如果我将qout
定义移至POSITION 2,则脚本结束。
简而言之,您必须注意qin
和qout
都不会太大。 (另请参见:Multiprocessing Queue maxsize limit is 32767)
[当尝试将字符串放入总大小约为5000 cahrs的队列中时,在python3
上存在相同的问题。
在我的项目中,有一个宿主进程设置一个队列并启动子进程,然后加入。其他join
主机进程从队列读取。当子进程产生太多数据时,主机挂在join
上。我使用以下函数修复了此问题,以等待主机进程中的子进程:
from multiprocessing import Process, Queue
from queue import Empty
def yield_from_process(q: Queue, p: Process):
while p.is_alive():
p.join(timeout=1)
while True:
try:
yield q.get(block=False)
except Empty:
break
我在队列填满后立即从队列中读取,因此永远不会变得很大
在池关闭后,我试图.get()
一个异步工作者
with块之外的缩进错误
我有这个
with multiprocessing.Pool() as pool:
async_results = list()
for job in jobs:
async_results.append(
pool.apply_async(
_worker_func,
(job,),
)
)
# wrong
for async_result in async_results:
yield async_result.get()
我需要这个
with multiprocessing.Pool() as pool:
async_results = list()
for job in jobs:
async_results.append(
pool.apply_async(
_worker_func,
(job,),
)
)
# right
for async_result in async_results:
yield async_result.get()