我正在编写一些代码,这些代码与如下模式并行运行作业:
/tmp/test.py
from concurrent.futures import ProcessPoolExecutor
from multiprocessing import freeze_support, get_context
import numpy as np
def job(i):
print("job", i)
np.linalg.svd(np.random.default_rng(0).normal(size=(1000, 1000)))
return i
if __name__ == "__main__":
freeze_support()
with ProcessPoolExecutor(1, mp_context=get_context("spawn")) as pool:
jobs = [pool.submit(job, i) for i in range(5)]
for res in jobs:
print("gather", res.result())
从打开
htop
开始,我注意到当我 ^C 时,期货仍在运行。例如,下面是我运行上面的 /tmp/test.py
时终端中的输出:
me@machine / % python /tmp/test.py
job 0
job 1
gather 0
job 2
gather 1
^Cjob 3
job 4
job 5
^CTraceback (most recent call last):
File "/tmp/test.py", line 18, in <module>
print("gather", res.result())
File ".../python/concurrent/futures/_base.py", line 453, in result
self._condition.wait(timeout)
File ".../python/threading.py", line 320, in wait
waiter.acquire()
KeyboardInterrupt
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/tmp/test.py", line 15, in <module>
with ProcessPoolExecutor(1, mp_context=get_context("spawn")) as pool:
File ".../python/concurrent/futures/_base.py", line 649, in __exit__
self.shutdown(wait=True)
File ".../python/concurrent/futures/process.py", line 775, in shutdown
self._executor_manager_thread.join()
File ".../python/threading.py", line 1096, in join
self._wait_for_tstate_lock()
File ".../python/threading.py", line 1116, in _wait_for_tstate_lock
if lock.acquire(block, timeout):
KeyboardInterrupt
^CException ignored in atexit callback: <function _exit_function at 0x100f54280>
Traceback (most recent call last):
File ".../python/multiprocessing/util.py", line 357, in _exit_function
p.join()
File ".../python/multiprocessing/process.py", line 149, in join
res = self._popen.wait(timeout)
File ".../python/multiprocessing/popen_fork.py", line 43, in wait
return self.poll(os.WNOHANG if timeout == 0.0 else 0)
File ".../python/multiprocessing/popen_fork.py", line 27, in poll
pid, sts = os.waitpid(self.pid, flag)
KeyboardInterrupt:
me@machine / % job 6
job 7
在这里,在我按一次 ^C 后,该过程继续进行。我让它再运行一点,然后在点击 ^C^C 后能够将其关闭。然而,在Jupyter Lab中,发送多个中断似乎无法关闭池。
我的问题:我上面的理解对吗?手动
.submit
执行器时是否有更好的方法来处理中断? Python 的文档没有描述执行器及其未来在中断时会发生什么,我找不到有用的来源,但是我是否错过了某个地方的文档页面?最后,我应该遵循另一种模式吗?