在 Windows 上使 Python 函数超时的最简单方法

问题描述 投票:0回答:2

以下过程运行十秒钟。五秒后我想杀掉它。

import time

def hello():
    for _ in range(10):
        print("hello")
        time.sleep(1)

hello()

解决方案可能涉及线程、多处理或装饰器。类似的问题以前曾被问过(提前抱歉),但解决方案都非常复杂。像这样听起来很基本的事情应该只需几行代码就可以实现。

python python-multiprocessing python-multithreading python-decorators
2个回答
3
投票

取决于用例...对于这个非常具体的示例

import time

def hello(timeout=5):
    start_time = time.time()
    for _ in range(10):
        if time.time()-start_time > timeout:
           break
        print("hello")
        time.sleep(1)

hello()

这是你可以做到的一种方法

或者你可以使用多重处理

import multiprocessing
...
if __name__ == "__main__":
  proc = multiprocessing.Process(target=hello)
  proc.start()
  time.sleep(5)
  proc.terminate()

0
投票

您可以使用

multiprocessing.pool.ThreadPool
来完成此操作。这在 Windows 和 Linux 上都适用。

基本解决方案

import multiprocessing.pool

try:
    with multiprocessing.pool.ThreadPool() as pool:
        pool.apply_async(hello).get(timeout=5)
except multiprocessing.TimeoutError:
    # do something if timeout

这将在单独的线程中运行

hello
。如果
hello
在 5 秒后仍未终止,则单独的线程将被终止。

    如果单独的线程在
  • get
     秒后尚未完成,
    multiprocessing.TimeoutError
    将引发
    timeout
  • 这将立即杀死单独的线程:
    • 退出
      with
      块会触发
      pool.__exit__()
    • 的执行
    • 我在文档中没有找到关于
      pool.__exit__()
      功能的解释,但我们可以在 其源代码 中看到它调用
      pool.terminate()
    • 根据其文档
      pool.terminate()
      立即停止工作进程而不完成未完成的工作。”。 (请注意,文档在这里讨论了worker processes,因为它是
      multiprocessing.pool.Pool
      的文档,它使用进程。
      ThreadPool
      提供与
      Pool
      相同的API,但使用线程。)

论据

您可以将参数传递给您的函数:

try:
    with multiprocessing.pool.ThreadPool() as pool:
        pool.apply_async(
            hello,
            (positional_argument, other_positional_argument),
            {keyword_argument=42, other_keyword_argument=1337}
        ).get(timeout=5)
except multiprocessing.TimeoutError:
    # do something if timeout

返回值

您可以获得函数的返回值:

try:
    with multiprocessing.pool.ThreadPool() as pool:
        return_value = pool.apply_async(hello).get(timeout=5)
except multiprocessing.TimeoutError:
    # do something if timeout
else:
    # do something with return_value

例外情况

如果您的函数引发异常,它将被透明地重新引发:

try:
    with multiprocessing.pool.ThreadPool() as pool:
        pool.apply_async(hello).get(timeout=5)
except multiprocessing.TimeoutError:
    # do something if timeout
except SomeExceptionRaisedByHello as e:
    # do something with e

使用进程而不是线程

如果您想在单独的进程而不是单独的线程中运行函数,只需使用

multiprocessing.pool.Pool
。它具有与
multiprocessing.pool.ThreadPool
相同的 API。

请注意,在这种情况下您必须使用

if __name__ == '__main__'
:

if __name__ == "__main__":
    try:
        with multiprocessing.pool.Pool() as pool:
            pool.apply_async(hello)
    except multiprocessing.TimeoutError:
        # do something if timeout

注意事项
concurrent.futures.Executor

文档保留以下关于

ThreadPool
:

用户通常应该更喜欢使用concurrent.futures.ThreadPoolExecutor,它有一个更简单的接口,从一开始就围绕线程设计,并且返回与许多其他库(包括asyncio)兼容的concurrent.futures.Future实例。

concurrent.futures
有一个
Executor
类,提供与
multiprocessing
的池类似的功能。然而,使用
Executor
实例来使函数超时,就像我们对
multiprocessing
的池所做的那样,会带来以下问题:

使用

Executor
,发生超时时不可能杀死单独的线程或进程(参见这个问题)。您可以等待单独的线程或进程,直到发生超时,然后继续执行主程序,但单独的线程或进程仍将并行运行,直到目标函数终止。

如果您想要超时的函数可能永远不会终止或者可能需要很长时间才能终止,那么这尤其成问题,因为即使您的主程序终止,它的执行也会继续。在这种情况下,您的 python 脚本不会终止,而是在执行结束时永远挂起,等待执行目标函数的单独线程或进程。

© www.soinside.com 2019 - 2024. All rights reserved.