是否可以取消`KeyboardInterrupt`上的未决期货?

问题描述 投票:0回答:0

我正在编写一些代码,这些代码与如下模式并行运行作业:

/tmp/test.py

from concurrent.futures import ProcessPoolExecutor
from multiprocessing import freeze_support, get_context

import numpy as np


def job(i):
    print("job", i)
    np.linalg.svd(np.random.default_rng(0).normal(size=(1000, 1000)))
    return i


if __name__ == "__main__":
    freeze_support()
    with ProcessPoolExecutor(1, mp_context=get_context("spawn")) as pool:
        jobs = [pool.submit(job, i) for i in range(5)]
        for res in jobs:
            print("gather", res.result())

从打开

htop
开始,我注意到当我 ^C 时,期货仍在运行。例如,下面是我运行上面的
/tmp/test.py
时终端中的输出:

me@machine / % python /tmp/test.py
job 0
job 1
gather 0
job 2
gather 1
^Cjob 3
job 4
job 5
^CTraceback (most recent call last):
  File "/tmp/test.py", line 18, in <module>
    print("gather", res.result())
  File ".../python/concurrent/futures/_base.py", line 453, in result
    self._condition.wait(timeout)
  File ".../python/threading.py", line 320, in wait
    waiter.acquire()
KeyboardInterrupt

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/tmp/test.py", line 15, in <module>
    with ProcessPoolExecutor(1, mp_context=get_context("spawn")) as pool:
  File ".../python/concurrent/futures/_base.py", line 649, in __exit__
    self.shutdown(wait=True)
  File ".../python/concurrent/futures/process.py", line 775, in shutdown
    self._executor_manager_thread.join()
  File ".../python/threading.py", line 1096, in join
    self._wait_for_tstate_lock()
  File ".../python/threading.py", line 1116, in _wait_for_tstate_lock
    if lock.acquire(block, timeout):
KeyboardInterrupt
^CException ignored in atexit callback: <function _exit_function at 0x100f54280>
Traceback (most recent call last):
  File ".../python/multiprocessing/util.py", line 357, in _exit_function
    p.join()
  File ".../python/multiprocessing/process.py", line 149, in join
    res = self._popen.wait(timeout)
  File ".../python/multiprocessing/popen_fork.py", line 43, in wait
    return self.poll(os.WNOHANG if timeout == 0.0 else 0)
  File ".../python/multiprocessing/popen_fork.py", line 27, in poll
    pid, sts = os.waitpid(self.pid, flag)
KeyboardInterrupt: 

me@machine / % job 6
job 7

在这里,在我按一次 ^C 后,该过程继续进行。我让它再运行一点,然后在点击 ^C^C 后能够将其关闭。然而,在Jupyter Lab中,发送多个中断似乎无法关闭池。

我的问题:我上面的理解对吗?手动

.submit
执行器时是否有更好的方法来处理中断? Python 的文档没有描述执行器及其未来在中断时会发生什么,我找不到有用的来源,但是我是否错过了某个地方的文档页面?最后,我应该遵循另一种模式吗?

python multiprocessing concurrent.futures
© www.soinside.com 2019 - 2024. All rights reserved.