如果池中的所有线程都忙,有没有办法阻止在Python线程池中应用任务?

问题描述 投票:0回答:1

我正在消耗一个队列,需要将数据传递到线程池进行处理,但我不想消耗比线程池可以处理的更多的项目。换句话说,我希望线程池能够调节队列被消耗的速率。

使用Python的默认

ThreadPool
apply_async()
函数会立即返回,显然线程池的任务队列可以无限制地增长——这正是我想要避免的。
ThreadPoolExecutor
˙submit()
功能似乎也有同样的作用。

有没有办法使用标准库或第三方模块来完成此操作,或者我是否必须使用从队列中提取记录的常规线程来完成此操作?

python python-multithreading
1个回答
0
投票

您可以通过使用正确初始化的

threading.Semaphore
实例来控制当前可以由池线程/进程执行的任务数,或者坐在池的输入队列中等待执行的任务数(如果池大小为N,那么我的偏好是让所有 N 个池进程执行已提交的任务,并且当池线程/进程变得空闲时,池的输入队列中的额外 N 个任务可以立即运行,而无需首先等待新任务提交)。

例如,如果池大小为 8,我们希望用 16 或更大的计数来初始化信号量(同样,这是我的偏好,但您可以将该值设置为允许总共 N 个任务执行的任何值 N)当前正在执行或位于输入队列中等待执行)。以下代码使用多处理池,但与多线程池同样有效:

from multiprocessing import Pool, cpu_count
from threading import Semaphore

def generate_args() -> int:
    for arg in range(0, 1_000):
        yield arg

def worker(x: int) -> int:
    """Square the input."""

    return x * x

def main() -> None:
    results = []
    pool_size = cpu_count()
    # This allows for each pool process/thread to be running a task
    # and to have the input queue have a task for each process/thread
    # to immediately run when it becomes idle:
    semaphore = Semaphore(2 * pool_size)

    def callback(result: int) -> None:
        results.append(result)
        semaphore.release()  # Allow another task to be submitted

    pool = Pool(pool_size)

    for x in generate_args():
        semaphore.acquire()
        pool.apply_async(worker, args=(x,), callback=callback)

    # Wait for all tasks to complete:
    pool.close()
    pool.join()

    print(len(results))

if __name__ == '__main__':
    main()

如果您希望有可重用的类来执行此操作,那么:

import multiprocessing.pool
import multiprocessing
import threading

class BoundedQueuePool:
    def __init__(self, semaphore):
        self._semaphore = semaphore

    def _release(self, result, callback=None):
        self._semaphore.release()
        if callback:
            callback(result)

    def apply_async(self, func, args=(), kwds={}, callback=None, error_callback=None):
        self._semaphore.acquire()
        callback_fn = self._release if callback is None else lambda result: self._release(result, callback=callback)
        error_callback_fn = self._release if error_callback is None else lambda result: self._release(result, callback=callback)
        return super().apply_async(func, args, kwds, callback=callback_fn, error_callback=error_callback_fn)

class BoundedQueueProcessPool(BoundedQueuePool, multiprocessing.pool.Pool):
    def __init__(self, *args, max_waiting_tasks=None, **kwargs):
        multiprocessing.pool.Pool.__init__(self, *args, **kwargs)
        limit = kwargs['processes'] if 'processes' in kwargs else (args[0] if args else multiprocessing.cpu_count())
        BoundedQueuePool.__init__(self, threading.BoundedSemaphore(limit))

class BoundedQueueThreadPool(BoundedQueuePool, multiprocessing.pool.ThreadPool):
    def __init__(self, *args, max_waiting_tasks=None, **kwargs):
        multiprocessing.pool.ThreadPool.__init__(self, *args, **kwargs)
        limit = kwargs['processes'] if 'processes' in kwargs else (args[0] if args else multiprocessing.cpu_count())
        BoundedQueuePool.__init__(self, threading.BoundedSemaphore(limit))
© www.soinside.com 2019 - 2024. All rights reserved.