考虑以下代码片段:
import concurrent.futures
import time
def throw_func(a):
print(a)
time.sleep(10)
raise ValueError(a)
params = list(range(5000))
with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
future_to_row = {executor.submit(throw_func, param): param for param in params}
for future in concurrent.futures.as_completed(future_to_row):
row = future_to_row[future]
future.result()
似乎当出现未处理的异常时,程序仍然尝试完成所有提交的 future。
并且屏幕上没有打印任何异常消息。
为什么会发生这种情况?如何确保出现未处理的异常时程序会退出?
我的一个项目也遇到了同样的情况,这是我当时想出的解决方案。不幸的是,
concurrent.futures
模块中没有内置机制可以立即停止执行器中所有正在运行的线程。您可以创建一个自定义线程类来检查外部“停止”信号,然后实现一种机制,例如可以在发生异常时设置的事件。处理每个线程内的异常,并在发生异常时通知其他线程停止。我根据您的用例修改了我的解决方案:
import threading
import time
class StoppableThread(threading.Thread):
def __init__(self, func, args, stop_event):
super().__init__()
self.func = func
self.args = args
self.stop_event = stop_event
# Signal other threads to stop for exception
def run(self):
try:
self.func(*self.args)
except Exception as e:
print(f"Exception in thread: {e}")
self.stop_event.set()
def main(a, stop_event):
print(f"Thread {a} started")
time.sleep(10)
if a == 2:
raise ValueError(a)
if stop_event.is_set():
print(f"Thread {a} stopping early")
return
print(f"Thread {a} finished")
stop_event = threading.Event()
threads = [StoppableThread(main, (i, stop_event), stop_event) for i in range(5)]
for thread in threads:
thread.start()
for thread in threads:
thread.join()
print("All threads completed or stopped")
输出:
Thread 0 started
Thread 1 started
Thread 2 started
Thread 3 started
Thread 4 started
Thread 0 finished
Exception in thread: 2
Thread 1 stopping early
Thread 3 stopping early
Thread 4 stopping early
All threads completed or stopped
这可能不是解决此问题的最佳方法,因此如果您能找到更好的方法,请告诉我。
shutdown
功能。仔细阅读文档,了解如何调整参数以更好地达到预期结果。
with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
future_to_row = {executor.submit(throw_func, param): param for param in params}
for future in concurrent.futures.as_completed(future_to_row):
row = future_to_row[future]
try:
future.result()
except Exception as e:
print(row, e)
executor.shutdown(wait=True, cancel_futures=True)
break
# not needed in real situation
print("checking that the shutdown actually did its job")
for future, p in future_to_row.items():
print(p, future.done())
输出
0
1
2
3
4
1 # <- 1st repeated value is the 1st function to be finished, hence the exception. All next are pending futures
5
2306
checking that the shutdown actually did its job
0 True
1 True
2 True
3 True
4 True
5 True
6 True
7 True
8 True
9 True
10 True
11 True
12 True
13 True
14 True
15 True
16 True
...
4999 True