立即在并发中引发异常

问题描述 投票:0回答:2

我使用

concurrent.futures
同时运行多个线程。所有这些都是成功运行代码中后续步骤才能成功所必需的。

虽然在所有进程结束时我可以通过运行

.result()
引发任何异常,但理想情况下,单个线程中引发的任何异常都会立即停止所有线程。这将有助于更快地识别任何任务中的错误,而不是等到所有长时间运行的进程完成。

这可能吗?

python multithreading concurrency
2个回答
2
投票

可以在第一次异常后退出并且不向执行器提交任何新作业。但是,作业一旦提交就无法取消,您必须等待所有提交的作业完成(或超时)。有关详细信息,请参阅此问题。这是一个简短的示例,一旦发生第一个异常,就会取消所有未提交的作业。但是,它仍然等待已提交的作业完成。这使用

concurrent.futures 
FIRST_EXCEPTION 文档
中列出的 
wait(return_when=...)
常量。

import time
import concurrent.futures

def example(i):
    print(i)
    assert i != 1
    time.sleep(i)
    return i

if __name__ == "__main__":
    futures = []
    with concurrent.futures.ThreadPoolExecutor() as executor:
        for number in range(5):
            futures.append(executor.submit(example, number))

        exception = False
        for completed, running_or_error in concurrent.futures.wait(futures, return_when="FIRST_EXCEPTION"):
            try:
                 running_or_error.result()
            except Exception as e:
                for future in futures:
                    print(future.cancel()) # cancel all unstarted futures
                raise e

-1
投票

我看到了 SNygard 的答案。例外的未来似乎是在已完成的任务中,而不是仍在运行的任务中。拿去

import concurrent.futures
import time


def job(i):
    if i == 1:
        raise ValueError(i)

    print(f"Job {i} started")
    time.sleep(2)
    print(f"Job {i} finished")


if __name__ == "__main__":
    with concurrent.futures.ThreadPoolExecutor(2) as pool:
        start = time.time()
        tasks = [pool.submit(job, i) for i in range(6)]
        done_tasks, not_done_tasks = concurrent.futures.wait(tasks, return_when=concurrent.futures.FIRST_EXCEPTION)
        for task in done_tasks:
            err = task.exception()
            if err is not None:
                n_cancelled = 0
                for not_done in not_done_tasks:
                    n_cancelled += 1
                    print(f"Cancelling {n_cancelled} at {time.time() - start}")
                    not_done.cancel()
                print(f"Raising at {time.time() - start}")
                raise RuntimeError from err

运行这个会给出类似的东西

Job 0 started
Cancelling 1 at 0.00018787384033203125
Job 2 started
Cancelling 2 at 0.00021409988403320312
Cancelling 3 at 0.0002319812774658203
Cancelling 4 at 0.00023603439331054688
Cancelling 5 at 0.0002391338348388672
Raising at 0.00024199485778808594
Job 0 finished
Job 2 finished
Traceback (most recent call last):
  File "path/to/python3.11/concurrent/futures/thread.py", line 58, in run
    result = self.fn(*self.args, **self.kwargs)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "path/to/file.py", line 7, in job
    raise ValueError(i)
ValueError: 1

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "path/to/file.py", line 28, in <module>
    raise RuntimeError from err
RuntimeError
© www.soinside.com 2019 - 2024. All rights reserved.