Python Processpoolexecutor - 杀死队列?

问题描述 投票:1回答:2

这是我在stackoverflow上的第一个问题。我几乎能够在这里找到我需要知道的东西。非常感谢这个顺便说一下。

然而。如果我试图杀死我的ProcessPoolExecutor,它将只通过生成的整个队列工作(我想是这样吗?)。有没有简单的方法可以立即清理Processpoolexecutor的队列?

from concurrent.futures import ProcessPoolExecutor
from time import sleep
from random import randint


def something_fancy():
    sleep(randint(0, 5))
    return 'im back!'


class Work:
    def __init__(self):
        self.exe = ProcessPoolExecutor(4)

    def start_procs(self):
        for i in range(300):
            t = self.exe.submit(something_fancy)
            t.add_done_callback(self.done)

    def done(self, f):
        print f.result()

    def kill(self):
        self.exe.shutdown()


if __name__ == '__main__':
    work_obj = Work()
    work_obj.start_procs()
    sleep(5)
    work_obj.kill()

所以我想要做的是生成一个由300个队列组成的队列300。 5秒后,它应该退出。

我需要使用流程,因为gil btw。

python concurrent.futures
2个回答
0
投票

使用shutdown(wait=False)它会更快地返回。 wait的默认值是True否则它还提供了一个.Cancel(),如果不能清除则返回False。

link to the doku

尽管如此,它仍将完成所有游行:

如果waitTrue,则此方法将不会返回,直到所有待处理的期货完成执行并且与执行程序关联的资源已被释放。

如果waitFalse,则此方法将立即返回,并且当所有待处理的期货完成执行时,将释放与执行程序关联的资源。无论wait的值如何,整个Python程序都不会退出,直到所有待处理的期货都执行完毕。

如果您有固定的时间,则应提供超时:

map(func, *iterables, timeout=None, chunksize=1)

它可以是一个浮点数或整数 - 但是纪录片不会告诉它是ms还是什么......


0
投票

谢谢Patrick

通过提示我可以通过将Futures添加到列表并手动调整队列大小来取消每个进程。如果没有它,仍有很多流程正在推出。

看起来似乎没有api来调整队列大小,暂停执行或删除进程队列。

但是 - 实现这一点的唯一方法是在线程中运行Main对象,以便主脚可以随时终止它。我仍然试图抓住“CancelledError”。

对我来说看起来很“脏”而不是pythonic。我会接受任何其他建议。非常感谢。

from concurrent.futures import ProcessPoolExecutor, CancelledError
from time import sleep
from random import randint
from threading import Thread


def something_fancy():
    sleep(randint(0, 5))
    return 'im back!'


class Work:
    def __init__(self):
        self.exe = ProcessPoolExecutor(4)
        self.futures = []
        self.max_queue = 50
        self.killed = False

    def start_procs(self):
        for i in range(200000):
            while not self.killed:
                if len(self.futures) <= self.max_queue:
                    t = self.exe.submit(something_fancy)
                    t.add_done_callback(self.done)
                    self.futures.append(t)
                    break

    def done(self, f):
        print f.result()
        self.futures.remove(f)

    def kill(self):
        self.killed = True
        for future in self.futures:
            try:
                future.cancel()
            except CancelledError, e:
                print e


if __name__ == '__main__':
    work_obj = Work()
    Thread(target=work_obj.start_procs).start()
    sleep(5)
    work_obj.kill()

edit

from concurrent.futures import ProcessPoolExecutor, CancelledError
from time import sleep
from random import randint
from threading import Thread


def something_fancy():
    sleep(0.5)
    return 'Hello World, Future was running!'


class Work:
    def __init__(self):
        cpu_usage = 4
        self.exe = ProcessPoolExecutor(cpu_usage)
        self.futures = []
        self.max_queue = cpu_usage*3
        self.stop = False
        self.paused = False

    def start_procs(self):
        for i in range(200000):
            while not self.stop:
                if len(self.futures) <= self.max_queue:
                    if not self.paused:
                        t = self.exe.submit(something_fancy)
                        t.add_done_callback(self._done)
                        self.futures.append(t)
                        break

    def _done(self, f):
        print f.result()
        self.futures.remove(f)

    def pause(self):
        self.paused = False if self.paused else True

    def shutdown(self):
        self.stop = True
        for future in self.futures:
            try:
                future.cancel()
            except CancelledError, e:
                print e


if __name__ == '__main__':
    work_obj = Work()
    Thread(target=work_obj.start_procs).start()
    print 'Started'
    sleep(5)
    work_obj.pause()
    print 'Paused'
    sleep(5)
    work_obj.pause()
    print 'Continue'
    sleep(5)
    work_obj.shutdown()
    print 'Shutdown'

这工作 - 仍然没有捕获CancelledError,仍然很脏。

© www.soinside.com 2019 - 2024. All rights reserved.