ThreadPoolExecutor
如何工作的理解是,当我调用#submit
时,任务会分配给线程,直到所有可用线程都忙,此时执行器将任务放入队列中等待线程成为可用。
我想要的行为是在没有可用线程时阻塞,等待直到有可用线程,然后只提交我的任务。
背景是我的任务来自队列,我只想当有线程可用于处理这些消息时从队列中提取消息。
在理想的世界中,我可以向
#submit
提供一个选项,告诉它在线程不可用时阻塞,而不是将它们放入队列中。
但是,这个选项不存在。所以我正在看的是这样的:
with concurrent.futures.ThreadPoolExecutor(max_workers=CONCURRENCY) as executor:
while True:
wait_for_available_thread(executor)
message = pull_from_queue()
executor.submit(do_work_for_message, message)
而且我不确定
wait_for_available_thread
最干净的实现。
老实说,我很惊讶这实际上并不在
concurrent.futures
中,因为我本以为从队列中拉出并提交给线程池执行器的模式会相对常见。
一种方法可能是通过 Future 的
set
来跟踪当前正在运行的线程:
active_threads = set()
def pop_future(future):
active_threads.pop(future)
with concurrent.futures.ThreadPoolExecutor(max_workers=CONCURRENCY) as executor:
while True:
while len(active_threads) >= CONCURRENCY:
time.sleep(0.1) # or whatever
message = pull_from_queue()
future = executor.submit(do_work_for_message, message)
active_threads.add(future)
future.add_done_callback(pop_future)
更复杂的方法可能是让
done_callback
成为触发队列拉取的东西,而不是轮询和阻塞,但是如果工作人员设法领先于队列,那么您需要回退到轮询队列。
根据@Samwise的回答(https://stackoverflow.com/a/73396000/8388869),我扩展了ThreadPoolExecutor
import time
from concurrent.futures import Future, ThreadPoolExecutor
class AvailableThreadPoolExecutor(ThreadPoolExecutor):
"""ThreadPoolExecutor that keeps track of the number of available workers.
Refs:
inspired by https://stackoverflow.com/a/73396000/8388869
"""
def __init__(
self, max_workers=None, thread_name_prefix="", initializer=None, initargs=()
):
super().__init__(max_workers, thread_name_prefix, initializer, initargs)
self._running_worker_futures: set[Future] = set()
@property
def available_workers(self) -> int:
"""the number of available workers"""
return self._max_workers - len(self._running_worker_futures)
def wait_for_available_worker(self, timeout: float | None = None) -> None:
"""wait until there is an available worker
Args:
timeout: the maximum time to wait in seconds. If None, wait indefinitely.
Raises:
TimeoutError: if the timeout is reached.
"""
start_time = time.monotonic()
while True:
if self.available_workers > 0:
return
if timeout is not None and time.monotonic() - start_time > timeout:
raise TimeoutError
time.sleep(0.1)
def submit(self, fn, /, *args, **kwargs):
f = super().submit(fn, *args, **kwargs)
self._running_worker_futures.add(f)
f.add_done_callback(self._running_worker_futures.remove)
return f
它应该像这样工作:
with AvailableThreadPoolExecutor(max_workers=CONCURRENCY) as executor:
while True:
executor.wait_for_available_worker()
message = pull_from_queue()
executor.submit(do_work_for_message, message)