在Python的ThreadPoolExecutor中重试失败的futures

问题描述 投票:0回答:3

我想用Python的

concurrent.futures.ThreadPoolExecutor
实现重试逻辑。我想要以下属性:

  1. 一旦失败,新的 future 就会添加到工作队列中。
  2. 重试的未来可以再次重试,可以无限期重试,也可以达到最大重试次数。

我在网上找到的许多现有代码基本上都是按“轮次”运行的,它们在初始的 future 列表上调用

as_completed
,重新提交失败的 future,将这些 future 收集到一个新列表中,然后返回到调用
as_completed
新列表(如果它不为空)。基本上是这样的:

with concurrent.futures.ThreadPoolExecutor(...) as executor:
    futures = {executor.submit(fn, job): job for job in jobs}
    while len(futures) > 0:
        new_futures = {}
        for fut in concurrent.futures.as_completed(futures):
            if fut.exception():
                job = futures[fut]
                new_futures[executor.submit(fn, job)] = job
            else:
                ...  # logic to handle successful job
        futures = new_futures

但是,我认为这并不满足第一个属性,因为重试的 future 有可能在初始 future 之前完成,但我们不会在所有初始 future 完成之前处理它。

这是一个假设的病理病例。假设我们有两个作业,第一个作业运行 1 秒,但失败的可能性为 90%,而第二个作业运行 100 秒。如果我们的执行器有 2 个工作线程,并且第一个作业在 1 秒后失败,我们将立即重试。但如果再次失败,我们将无法重试,直到第二个作业完成。


所以我的问题是,是否可以使用这些所需的属性来实现重试逻辑,而不使用外部库或重写低级执行器逻辑?我尝试过的一件事是将重试逻辑放入发送给工作人员的代码中:

def worker_job(fn):
    try:
        return fn()
    except Exception:
        executor.submit(fn)

with concurrent.futures.ThreadPoolExecutor(...) as executor:
    jobs = [functools.partial(fn, arg) for arg in args]
    executor.map(worker_job, jobs)

但是从工作内部提交新工作似乎不起作用。

python concurrent.futures
3个回答
3
投票

使用
as_completed

重试

简单的方法

wait(..., return_when=FIRST_COMPLETED)
而不是
as_completed(...)
循环。

权衡:

  1. pending
    期货的开销(重新添加服务员,建筑
    new_futures
    )。
  2. 想要指定整体就麻烦了
    timeout
with concurrent.futures.ThreadPoolExecutor() as executor:
    futures = {executor.submit(fn, job): job for job in jobs}
    while len(futures) > 0:
        new_futures = {}
        done, pending = concurrent.futures.wait(futures, return_when=FIRST_COMPLETED)
        for fut in done:
            if fut.exception():
                job = futures[fut]
                new_futures[executor.submit(fn, job)] = job
            else:
                ...  # logic to handle successful job
        for fut in pending:
            job = futures[fut]
            new_futures[fut] = job
        futures = new_futures

高效方法

调整

as_completed(...)
以添加到
fs
pending
,然后使用
waiter

权衡:维护。

优点:如果需要的话,能够指定整体

timeout

class AsCompletedWaiterWrapper:
    def __init__(self):
        self.fs = None
        self.pending = None
        self.waiter = None

    def listen(self, fut):
        with self.waiter.lock:
            self.fs.add(fut)
            self.pending.add(fut)
            fut._waiters.append(self.waiter)

    def as_completed(self, fs, timeout=None):
        """
        concurrent.futures.as_completed plus the 3 lines marked with +.
        """
        if timeout is not None:
            end_time = timeout + time.monotonic()

        fs = set(fs)
        total_futures = len(fs)
        with _AcquireFutures(fs):
            finished = set(
                    f for f in fs
                    if f._state in [CANCELLED_AND_NOTIFIED, FINISHED])
            pending = fs - finished
            waiter = _create_and_install_waiters(fs, _AS_COMPLETED)
        self.fs = fs            # +
        self.pending = pending  # +
        self.waiter = waiter    # +
        finished = list(finished)
        try:
            yield from _yield_finished_futures(finished, waiter,
                                               ref_collect=(fs,))

            while pending:
                if timeout is None:
                    wait_timeout = None
                else:
                    wait_timeout = end_time - time.monotonic()
                    if wait_timeout < 0:
                        raise TimeoutError(
                                '%d (of %d) futures unfinished' % (
                                len(pending), total_futures))

                waiter.event.wait(wait_timeout)

                with waiter.lock:
                    finished = waiter.finished_futures
                    waiter.finished_futures = []
                    waiter.event.clear()

                # reverse to keep finishing order
                finished.reverse()
                yield from _yield_finished_futures(finished, waiter,
                                                   ref_collect=(fs, pending))

        finally:
            # Remove waiter from unfinished futures
            for f in fs:
                with f._condition:
                    f._waiters.remove(waiter)

用途:

with concurrent.futures.ThreadPoolExecutor() as executor:
    futures = {executor.submit(fn, job): job for job in jobs}
    w = AsCompletedWaiterWrapper()
    for fut in w.as_completed(futures):
        if fut.exception():
            job = futures[fut]
            new_fut = executor.submit(fn, job)
            futures[new_fut] = job
            w.listen(new_fut)
        else:
            ...  # logic to handle successful job

从工作助手重试

等待

events
中的
with ... executor:
,因为
ThreadPoolExecutor.__exit__
关闭
executor
,因此无法安排新的期货。

权衡:

  1. 由于主流程中引用了
    ProcessPoolExecutor
    ,因此无法与
    executor
    一起使用。
  2. 如果要指定整体超时时间会很麻烦。
def worker_job(fn, event):
    try:
        rv = fn()
        event.set()
        return rv
    except Exception:
        executor.submit(worker_job, fn, event)

with concurrent.futures.ThreadPoolExecutor() as executor:
    jobs = [functools.partial(fn, arg) for arg in args]
    events = [threading.Event() for _ in range(len(jobs))]
    executor.map(worker_job, jobs, events)
    for e in events:
        e.wait()

0
投票

你说:

但是如果再次失败,我们将无法重试,直到第二个作业完成。

但我不认为这是真的。代码说:

        for fut in concurrent.futures.as_completed(futures):

这有点微妙,因为

futures
不用作字典,而是用作迭代器,特别是用于检查完成情况的 future 迭代器。该迭代器提供了一组 future 来检查是否完成,然后
as_completed()
在它们完成时生成它们。因此,在您的病态情况下,1s 作业确实可以在 100s 作业完成之前重试多次。


0
投票

关于如何使用 as_completed + while 重试并发以及整个过程的一般超时的简单工作示例:

from random import random
from time import sleep, time
from concurrent.futures import ProcessPoolExecutor
from concurrent.futures import as_completed
 
# task that sleeps and could fail
def work(identifier):
    sleep(random())
    # fail chance of 80%
    if random() <  0.8:
        raise Exception(f'Something bad happened {identifier}')
    return f'Completed {identifier}'
 
if __name__ == '__main__':

    with ProcessPoolExecutor(10) as executor:
        futures_to_data = {executor.submit(work, i):i for i in range(10)}
        
        timeout_start = time()
        max_run_time = 4 #seconds
        while len(futures_to_data) > 0 and time() < timeout_start + max_run_time:
            for future in as_completed(futures_to_data):
                if future.exception():
                    data = futures_to_data[future]
                    del futures_to_data[future]

                    future_retry = executor.submit(work, data)
                    futures_to_data[future_retry] = data

                    print(f'Failure, adding to retry {data}')
                else:
                    print(f'Success, {future.result()}')
                    del futures_to_data[future]

            print(f'Retires, {futures_to_data.values()}, time remaining {timeout_start + max_run_time - time()}')
© www.soinside.com 2019 - 2024. All rights reserved.