Gevent:如何等待一组greenlets完成

问题描述 投票:0回答:1

我有一个在几个任务生成器之间共享的gevent.pool(具有固定大小)。如果有空闲插槽,每个任务生产者都可以将新的greenlet应用于池。将任务添加到池后,任务生成器应等待所有添加的任务完成。

我尝试使用gevent.queue.JoinableQueue等待所有任务完成。除非我在等待结束时遇到非常烦人的异常,否则它会起作用。

如何修复下面的代码以避免这种情况?也许我做错了什么?

from gevent import monkey, sleep; monkey.patch_all()
from gevent.queue import JoinableQueue
from gevent.pool import Pool

pool = Pool(3)


def worker(n):
    print 'Worker {} started'.format(n)
    sleep(1)
    print 'Worker {} finished'.format(n)
    return n


def main():
    results = []

    queue = JoinableQueue()
    for job_no in range(5):
        pool.wait_available()
        greenlet = pool.apply_async(worker, kwds=dict(n=job_no), callback=lambda ret: results.append(ret))
        queue.put(greenlet)
        sleep(.05)
    print 'All workers added'

    queue.join()
    print 'All workers finished', results


if __name__ == '__main__':
    main()

输出:

Worker 0 started
Worker 1 started
Worker 2 started
Worker 0 finished
Worker 3 started
Worker 1 finished
Worker 4 started
All workers added
Worker 2 finished
Worker 3 finished
Worker 4 finished
Traceback (most recent call last):
  File "main.py", line 32, in <module>
    main()
  File "main.py", line 27, in main
    queue.join()
  File "C:\Python\2.7.10\x64\lib\site-packages\gevent\queue.py", line 492, in join
    return self._cond.wait(timeout=timeout)
  File "C:\Python\2.7.10\x64\lib\site-packages\gevent\event.py", line 219, in wait
    return self._wait(timeout)
  File "C:\Python\2.7.10\x64\lib\site-packages\gevent\event.py", line 129, in _wait
    gotit = self._wait_core(timeout)
  File "C:\Python\2.7.10\x64\lib\site-packages\gevent\event.py", line 106, in _wait_core
    result = self.hub.switch()
  File "C:\Python\2.7.10\x64\lib\site-packages\gevent\hub.py", line 630, in switch
    return RawGreenlet.switch(self)
gevent.hub.LoopExit: ('This operation would block forever', <Hub at 0x26c1c28 select default pending=0 ref=0>)
python gevent greenlets
1个回答
1
投票

你得到'此操作将永远阻止'错误,因为没有greenlet来消耗队列中的任务,queue.join()只是阻塞直到所有greenlet完成,然后引发异常。

这里不需要JoinableQueue,使用gevent.joinall()等待所有greenlets完成:

import gevent

def main():
    results = []
    gs = []
    for job_no in range(5):
        greenlet = ..
        gs.append(greenlet)
    gevent.joinall(gs)
    print 'All workers finished', results
© www.soinside.com 2019 - 2024. All rights reserved.