Python 并发 futures 即使在未处理的异常上也能执行

问题描述 投票:0回答:2

考虑以下代码片段:

import concurrent.futures
import time


def throw_func(a):
    print(a)
    time.sleep(10)
    raise ValueError(a)
    

params = list(range(5000))
    
with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
    future_to_row = {executor.submit(throw_func, param): param for param in params}
    for future in concurrent.futures.as_completed(future_to_row):
        row = future_to_row[future]
        future.result()

似乎当出现未处理的异常时,程序仍然尝试完成所有提交的 future。

并且屏幕上没有打印任何异常消息。

为什么会发生这种情况?如何确保出现未处理的异常时程序会退出?

python multithreading
2个回答
0
投票

我的一个项目也遇到了同样的情况,这是我当时想出的解决方案。不幸的是,

concurrent.futures
模块中没有内置机制可以立即停止执行器中所有正在运行的线程。您可以创建一个自定义线程类来检查外部“停止”信号,然后实现一种机制,例如可以在发生异常时设置的事件。处理每个线程内的异常,并在发生异常时通知其他线程停止。我根据您的用例修改了我的解决方案:

import threading
import time

class StoppableThread(threading.Thread):
    def __init__(self, func, args, stop_event):
        super().__init__()
        self.func = func
        self.args = args
        self.stop_event = stop_event
        
    # Signal other threads to stop for exception
    def run(self):
        try:
            self.func(*self.args)
        except Exception as e:
            print(f"Exception in thread: {e}")
            self.stop_event.set()  

def main(a, stop_event):
    print(f"Thread {a} started")
    time.sleep(10)
    if a == 2:
        raise ValueError(a)
    if stop_event.is_set():
        print(f"Thread {a} stopping early")
        return
    print(f"Thread {a} finished")

stop_event = threading.Event()


threads = [StoppableThread(main, (i, stop_event), stop_event) for i in range(5)]
for thread in threads:
    thread.start()

for thread in threads:
    thread.join()

print("All threads completed or stopped")

输出:

Thread 0 started
Thread 1 started
Thread 2 started
Thread 3 started
Thread 4 started
Thread 0 finished
Exception in thread: 2
Thread 1 stopping early
Thread 3 stopping early
Thread 4 stopping early
All threads completed or stopped

这可能不是解决此问题的最佳方法,因此如果您能找到更好的方法,请告诉我。


0
投票

执行器具有

shutdown
功能。仔细阅读文档,了解如何调整参数以更好地达到预期结果。

with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
    future_to_row = {executor.submit(throw_func, param): param for param in params}
    for future in concurrent.futures.as_completed(future_to_row):
        row = future_to_row[future]
        try:
            future.result()
        except Exception as e:
            print(row, e)
            executor.shutdown(wait=True, cancel_futures=True)
            break
    
    # not needed in real situation
    print("checking that the shutdown actually did its job")
    for future, p in future_to_row.items():
        print(p, future.done())    

输出

0
1
2
3
4
1 # <- 1st repeated value is the 1st function to be finished, hence the exception. All next are pending futures
5
2306
checking that the shutdown actually did its job
0 True
1 True
2 True
3 True
4 True
5 True
6 True
7 True
8 True
9 True
10 True
11 True
12 True
13 True
14 True
15 True
16 True
...
4999 True
© www.soinside.com 2019 - 2024. All rights reserved.