我正在消耗一个队列,需要将数据传递到线程池进行处理,但我不想消耗比线程池可以处理的更多的项目。换句话说,我希望线程池能够调节队列被消耗的速率。
使用Python的默认
ThreadPool
,apply_async()
函数会立即返回,显然线程池的任务队列可以无限制地增长——这正是我想要避免的。 ThreadPoolExecutor
的 ˙submit()
功能似乎也有同样的作用。
有没有办法使用标准库或第三方模块来完成此操作,或者我是否必须使用从队列中提取记录的常规线程来完成此操作?
您可以通过使用正确初始化的
threading.Semaphore
实例来控制当前可以由池线程/进程执行的任务数,或者坐在池的输入队列中等待执行的任务数(如果池大小为N,那么我的偏好是让所有 N 个池进程执行已提交的任务,并且当池线程/进程变得空闲时,池的输入队列中的额外 N 个任务可以立即运行,而无需首先等待新任务提交)。
例如,如果池大小为 8,我们希望用 16 或更大的计数来初始化信号量(同样,这是我的偏好,但您可以将该值设置为允许总共 N 个任务执行的任何值 N)当前正在执行或位于输入队列中等待执行)。以下代码使用多处理池,但与多线程池同样有效:
from multiprocessing import Pool, cpu_count
from threading import Semaphore
def generate_args() -> int:
for arg in range(0, 1_000):
yield arg
def worker(x: int) -> int:
"""Square the input."""
return x * x
def main() -> None:
results = []
pool_size = cpu_count()
# This allows for each pool process/thread to be running a task
# and to have the input queue have a task for each process/thread
# to immediately run when it becomes idle:
semaphore = Semaphore(2 * pool_size)
def callback(result: int) -> None:
results.append(result)
semaphore.release() # Allow another task to be submitted
pool = Pool(pool_size)
for x in generate_args():
semaphore.acquire()
pool.apply_async(worker, args=(x,), callback=callback)
# Wait for all tasks to complete:
pool.close()
pool.join()
print(len(results))
if __name__ == '__main__':
main()
如果您希望有可重用的类来执行此操作,那么:
import multiprocessing.pool
import multiprocessing
import threading
class BoundedQueuePool:
def __init__(self, semaphore):
self._semaphore = semaphore
def _release(self, result, callback=None):
self._semaphore.release()
if callback:
callback(result)
def apply_async(self, func, args=(), kwds={}, callback=None, error_callback=None):
self._semaphore.acquire()
callback_fn = self._release if callback is None else lambda result: self._release(result, callback=callback)
error_callback_fn = self._release if error_callback is None else lambda result: self._release(result, callback=callback)
return super().apply_async(func, args, kwds, callback=callback_fn, error_callback=error_callback_fn)
class BoundedQueueProcessPool(BoundedQueuePool, multiprocessing.pool.Pool):
def __init__(self, *args, max_waiting_tasks=None, **kwargs):
multiprocessing.pool.Pool.__init__(self, *args, **kwargs)
limit = kwargs['processes'] if 'processes' in kwargs else (args[0] if args else multiprocessing.cpu_count())
BoundedQueuePool.__init__(self, threading.BoundedSemaphore(limit))
class BoundedQueueThreadPool(BoundedQueuePool, multiprocessing.pool.ThreadPool):
def __init__(self, *args, max_waiting_tasks=None, **kwargs):
multiprocessing.pool.ThreadPool.__init__(self, *args, **kwargs)
limit = kwargs['processes'] if 'processes' in kwargs else (args[0] if args else multiprocessing.cpu_count())
BoundedQueuePool.__init__(self, threading.BoundedSemaphore(limit))