一旦期货开始,你如何杀死它们?

问题描述 投票:0回答:4

我正在使用新的

concurrent.futures
模块(它也有一个 Python 2 反向移植)来执行一些简单的多线程 I/O。我无法理解如何干净地终止使用此模块启动的任务。

查看以下 Python 2/3 脚本,它重现了我所看到的行为:

#!/usr/bin/env python
from __future__ import print_function

import concurrent.futures
import time


def control_c_this():
    with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
        future1 = executor.submit(wait_a_bit, name="Jack")
        future2 = executor.submit(wait_a_bit, name="Jill")

        for future in concurrent.futures.as_completed([future1, future2]):
            future.result()

        print("All done!")


def wait_a_bit(name):
    print("{n} is waiting...".format(n=name))
    time.sleep(100)


if __name__ == "__main__":
    control_c_this()

当这个脚本运行时,使用常规的 Control-C 键盘中断似乎不可能彻底杀死它。我在 OS X 上运行。

  • 在 Python 2.7 上,我必须从命令行求助于
    kill
    来终止脚本。 Control-C 只是被忽略。
  • 在 Python 3.4 上,如果按两次 Control-C,它就会起作用,但随后会转储很多奇怪的堆栈跟踪。

我在网上找到的大多数文档都讨论了如何使用旧的

threading
模块干净地杀死线程。这些似乎都不适用于此。

并且

concurrent.futures
模块中提供的用于停止操作的所有方法(如
Executor.shutdown()
Future.cancel()
)仅在 Future 尚未开始或完成时才起作用,在这种情况下这是毫无意义的。我想立即打断Future。

我的用例很简单:当用户按下 Control-C 时,脚本应该像任何行为良好的脚本一样立即退出。这就是我想要的。

那么使用

concurrent.futures
时获得此行为的正确方法是什么?

python multithreading concurrent.futures
4个回答
42
投票

有点痛苦。本质上,工作线程必须在主线程退出之前完成。除非他们这样做,否则您无法退出。典型的解决方法是拥有一些全局状态,每个线程都可以检查以确定它们是否应该做更多工作。

这里的引用解释了原因。本质上,如果线程在解释器退出时退出,则可能会发生不好的事情。

这是一个工作示例。请注意,由于子线程的睡眠持续时间,C-c 最多需要 1 秒来传播。

#!/usr/bin/env python
from __future__ import print_function

import concurrent.futures
import time
import sys

quit = False
def wait_a_bit(name):
    while not quit:
        print("{n} is doing work...".format(n=name))
        time.sleep(1)

def setup():
    executor = concurrent.futures.ThreadPoolExecutor(max_workers=5)
    future1 = executor.submit(wait_a_bit, "Jack")
    future2 = executor.submit(wait_a_bit, "Jill")

    # main thread must be doing "work" to be able to catch a Ctrl+C 
    # http://www.luke.maurits.id.au/blog/post/threads-and-signals-in-python.html
    while (not (future1.done() and future2.done())):
        time.sleep(1)

if __name__ == "__main__":
    try:
        setup()
    except KeyboardInterrupt:
        quit = True

12
投票

我遇到了这个问题,但我遇到的问题是,许多 future(数十个或数千个)将等待运行,只需按 Ctrl-C 让它们等待,而不是真正退出。我使用

concurrent.futures.wait
运行进度循环,需要添加
try ... except KeyboardInterrupt
来处理取消未完成的 Future。

POLL_INTERVAL = 5
with concurrent.futures.ThreadPoolExecutor(max_workers=MAX_WORKERS) as pool:
    futures = [pool.submit(do_work, arg) for arg in large_set_to_do_work_over]
    # next line returns instantly
    done, not_done = concurrent.futures.wait(futures, timeout=0)
    try:
        while not_done:
            # next line 'sleeps' this main thread, letting the thread pool run
            freshly_done, not_done = concurrent.futures.wait(not_done, timeout=POLL_INTERVAL)
            done |= freshly_done
            # more polling stats calculated here and printed every POLL_INTERVAL seconds...
    except KeyboardInterrupt:
        # only futures that are not done will prevent exiting
        for future in not_done:
            # cancel() returns False if it's already done or currently running,
            # and True if was able to cancel it; we don't need that return value
            _ = future.cancel()
        # wait for running futures that the above for loop couldn't cancel (note timeout)
        _ = concurrent.futures.wait(not_done, timeout=None)

如果您有兴趣准确跟踪已完成的内容和未完成的内容(即不想要进度循环),则可以将第一个等待调用(带有

timeout=0
的那个)替换为
 not_done = futures
并仍然保留
while not_done:
逻辑。

for future in not_done:
取消循环可能会根据返回值(或写为推导式)而表现不同,但等待完成或取消的 future 并不是真正的等待 - 它会立即返回。最后一个
wait
timeout=None
确保池的正在运行的作业确实完成。

再次强调,只有当实际调用的

do_work
最终在合理的时间内返回时,这才可以正常工作。这对我来说很好 - 事实上,我想确保如果
do_work
开始,它就会运行到完成。如果
do_work
是“无尽的”,那么你需要类似 cdosborn 的答案 的东西,它使用一个对所有线程可见的变量,指示它们自行停止。


3
投票

聚会迟到了,但我也遇到了同样的问题。

我想立即终止我的程序,我不在乎发生了什么。我不需要 Linux 所能做到的彻底关闭。

我发现用

os.kill(os.getpid(), 9)
替换 KeyboardInterrupt 异常处理程序中的 geitda 代码会在第一个 ^C 之后立即退出。


0
投票
main = str(os.getpid())
def ossystem(c):
    return subprocess.Popen(c, shell=True, stdout=subprocess.PIPE).stdout.read().decode("utf-8").strip()
def killexecutor():
    print("Killing")
    pids = ossystem('ps -a | grep scriptname.py').split('\n')
    for pid in pids:
        pid = pid.split(' ')[0].strip()
        if(str(pid) != main):
            os.kill(int(pid), 9)


...
    killexecutor()
© www.soinside.com 2019 - 2024. All rights reserved.