我想用Python的
concurrent.futures.ThreadPoolExecutor
实现重试逻辑。我想要以下属性:
我在网上找到的许多现有代码基本上都是按“轮次”运行的,它们在初始的 future 列表上调用
as_completed
,重新提交失败的 future,将这些 future 收集到一个新列表中,然后返回到调用 as_completed
新列表(如果它不为空)。基本上是这样的:
with concurrent.futures.ThreadPoolExecutor(...) as executor:
futures = {executor.submit(fn, job): job for job in jobs}
while len(futures) > 0:
new_futures = {}
for fut in concurrent.futures.as_completed(futures):
if fut.exception():
job = futures[fut]
new_futures[executor.submit(fn, job)] = job
else:
... # logic to handle successful job
futures = new_futures
但是,我认为这并不满足第一个属性,因为重试的 future 有可能在初始 future 之前完成,但我们不会在所有初始 future 完成之前处理它。
这是一个假设的病理病例。假设我们有两个作业,第一个作业运行 1 秒,但失败的可能性为 90%,而第二个作业运行 100 秒。如果我们的执行器有 2 个工作线程,并且第一个作业在 1 秒后失败,我们将立即重试。但如果再次失败,我们将无法重试,直到第二个作业完成。
所以我的问题是,是否可以使用这些所需的属性来实现重试逻辑,而不使用外部库或重写低级执行器逻辑?我尝试过的一件事是将重试逻辑放入发送给工作人员的代码中:
def worker_job(fn):
try:
return fn()
except Exception:
executor.submit(fn)
with concurrent.futures.ThreadPoolExecutor(...) as executor:
jobs = [functools.partial(fn, arg) for arg in args]
executor.map(worker_job, jobs)
但是从工作内部提交新工作似乎不起作用。
as_completed
用
wait(..., return_when=FIRST_COMPLETED)
而不是 as_completed(...)
循环。
权衡:
pending
期货的开销(重新添加服务员,建筑new_futures
)。timeout
。with concurrent.futures.ThreadPoolExecutor() as executor:
futures = {executor.submit(fn, job): job for job in jobs}
while len(futures) > 0:
new_futures = {}
done, pending = concurrent.futures.wait(futures, return_when=FIRST_COMPLETED)
for fut in done:
if fut.exception():
job = futures[fut]
new_futures[executor.submit(fn, job)] = job
else:
... # logic to handle successful job
for fut in pending:
job = futures[fut]
new_futures[fut] = job
futures = new_futures
调整
as_completed(...)
以添加到 fs
和 pending
,然后使用 waiter
。
权衡:维护。
优点:如果需要的话,能够指定整体
timeout
。
class AsCompletedWaiterWrapper:
def __init__(self):
self.fs = None
self.pending = None
self.waiter = None
def listen(self, fut):
with self.waiter.lock:
self.fs.add(fut)
self.pending.add(fut)
fut._waiters.append(self.waiter)
def as_completed(self, fs, timeout=None):
"""
concurrent.futures.as_completed plus the 3 lines marked with +.
"""
if timeout is not None:
end_time = timeout + time.monotonic()
fs = set(fs)
total_futures = len(fs)
with _AcquireFutures(fs):
finished = set(
f for f in fs
if f._state in [CANCELLED_AND_NOTIFIED, FINISHED])
pending = fs - finished
waiter = _create_and_install_waiters(fs, _AS_COMPLETED)
self.fs = fs # +
self.pending = pending # +
self.waiter = waiter # +
finished = list(finished)
try:
yield from _yield_finished_futures(finished, waiter,
ref_collect=(fs,))
while pending:
if timeout is None:
wait_timeout = None
else:
wait_timeout = end_time - time.monotonic()
if wait_timeout < 0:
raise TimeoutError(
'%d (of %d) futures unfinished' % (
len(pending), total_futures))
waiter.event.wait(wait_timeout)
with waiter.lock:
finished = waiter.finished_futures
waiter.finished_futures = []
waiter.event.clear()
# reverse to keep finishing order
finished.reverse()
yield from _yield_finished_futures(finished, waiter,
ref_collect=(fs, pending))
finally:
# Remove waiter from unfinished futures
for f in fs:
with f._condition:
f._waiters.remove(waiter)
用途:
with concurrent.futures.ThreadPoolExecutor() as executor:
futures = {executor.submit(fn, job): job for job in jobs}
w = AsCompletedWaiterWrapper()
for fut in w.as_completed(futures):
if fut.exception():
job = futures[fut]
new_fut = executor.submit(fn, job)
futures[new_fut] = job
w.listen(new_fut)
else:
... # logic to handle successful job
等待
events
中的with ... executor:
,因为ThreadPoolExecutor.__exit__
关闭executor
,因此无法安排新的期货。
权衡:
ProcessPoolExecutor
,因此无法与 executor
一起使用。def worker_job(fn, event):
try:
rv = fn()
event.set()
return rv
except Exception:
executor.submit(worker_job, fn, event)
with concurrent.futures.ThreadPoolExecutor() as executor:
jobs = [functools.partial(fn, arg) for arg in args]
events = [threading.Event() for _ in range(len(jobs))]
executor.map(worker_job, jobs, events)
for e in events:
e.wait()
你说:
但是如果再次失败,我们将无法重试,直到第二个作业完成。
但我不认为这是真的。代码说:
for fut in concurrent.futures.as_completed(futures):
这有点微妙,因为
futures
不用作字典,而是用作迭代器,特别是用于检查完成情况的 future 迭代器。该迭代器提供了一组 future 来检查是否完成,然后 as_completed()
在它们完成时生成它们。因此,在您的病态情况下,1s 作业确实可以在 100s 作业完成之前重试多次。
关于如何使用 as_completed + while 重试并发以及整个过程的一般超时的简单工作示例:
from random import random
from time import sleep, time
from concurrent.futures import ProcessPoolExecutor
from concurrent.futures import as_completed
# task that sleeps and could fail
def work(identifier):
sleep(random())
# fail chance of 80%
if random() < 0.8:
raise Exception(f'Something bad happened {identifier}')
return f'Completed {identifier}'
if __name__ == '__main__':
with ProcessPoolExecutor(10) as executor:
futures_to_data = {executor.submit(work, i):i for i in range(10)}
timeout_start = time()
max_run_time = 4 #seconds
while len(futures_to_data) > 0 and time() < timeout_start + max_run_time:
for future in as_completed(futures_to_data):
if future.exception():
data = futures_to_data[future]
del futures_to_data[future]
future_retry = executor.submit(work, data)
futures_to_data[future_retry] = data
print(f'Failure, adding to retry {data}')
else:
print(f'Success, {future.result()}')
del futures_to_data[future]
print(f'Retires, {futures_to_data.values()}, time remaining {timeout_start + max_run_time - time()}')