我使用
concurrent.futures
同时运行多个线程。所有这些都是成功运行代码中后续步骤才能成功所必需的。
虽然在所有进程结束时我可以通过运行
.result()
引发任何异常,但理想情况下,单个线程中引发的任何异常都会立即停止所有线程。这将有助于更快地识别任何任务中的错误,而不是等到所有长时间运行的进程完成。
这可能吗?
可以在第一次异常后退出并且不向执行器提交任何新作业。但是,作业一旦提交就无法取消,您必须等待所有提交的作业完成(或超时)。有关详细信息,请参阅此问题。这是一个简短的示例,一旦发生第一个异常,就会取消所有未提交的作业。但是,它仍然等待已提交的作业完成。这使用
concurrent.futures
FIRST_EXCEPTION
文档中列出的
wait(return_when=...)
常量。
import time
import concurrent.futures
def example(i):
print(i)
assert i != 1
time.sleep(i)
return i
if __name__ == "__main__":
futures = []
with concurrent.futures.ThreadPoolExecutor() as executor:
for number in range(5):
futures.append(executor.submit(example, number))
exception = False
for completed, running_or_error in concurrent.futures.wait(futures, return_when="FIRST_EXCEPTION"):
try:
running_or_error.result()
except Exception as e:
for future in futures:
print(future.cancel()) # cancel all unstarted futures
raise e
我看到了 SNygard 的答案。例外的未来似乎是在已完成的任务中,而不是仍在运行的任务中。拿去
import concurrent.futures
import time
def job(i):
if i == 1:
raise ValueError(i)
print(f"Job {i} started")
time.sleep(2)
print(f"Job {i} finished")
if __name__ == "__main__":
with concurrent.futures.ThreadPoolExecutor(2) as pool:
start = time.time()
tasks = [pool.submit(job, i) for i in range(6)]
done_tasks, not_done_tasks = concurrent.futures.wait(tasks, return_when=concurrent.futures.FIRST_EXCEPTION)
for task in done_tasks:
err = task.exception()
if err is not None:
n_cancelled = 0
for not_done in not_done_tasks:
n_cancelled += 1
print(f"Cancelling {n_cancelled} at {time.time() - start}")
not_done.cancel()
print(f"Raising at {time.time() - start}")
raise RuntimeError from err
运行这个会给出类似的东西
Job 0 started
Cancelling 1 at 0.00018787384033203125
Job 2 started
Cancelling 2 at 0.00021409988403320312
Cancelling 3 at 0.0002319812774658203
Cancelling 4 at 0.00023603439331054688
Cancelling 5 at 0.0002391338348388672
Raising at 0.00024199485778808594
Job 0 finished
Job 2 finished
Traceback (most recent call last):
File "path/to/python3.11/concurrent/futures/thread.py", line 58, in run
result = self.fn(*self.args, **self.kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "path/to/file.py", line 7, in job
raise ValueError(i)
ValueError: 1
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "path/to/file.py", line 28, in <module>
raise RuntimeError from err
RuntimeError