具有限制队列的Python ProcessPoolExecutor

问题描述 投票:0回答:1

是否有一种方法让ProcessPoolExecutor在所有工作人员都忙时有一个有限的队列来处理传入的请求?

在文档中,没有解释如果调用commit()并且所有工作人员都忙时会发生什么。但是,我进行了一些研究,结果发现ProcessPoolExecutor有自己的内部Queue,它显然是不受限制的。通常,使用未绑定队列不是一个好习惯,因为执行器可以使系统崩溃(DoS攻击)。如果“ some_function”花费太多时间运行并接收到大尺寸的args,类似的事情很容易使系统崩溃。

with ProcessPoolExecutor(max_workers=5) as executor:
    for arg in range(10000000000000):
        future = executor.submit(some_function, args)

我想知道是否有一种方法可以限制内部队列的大小,或者是否可以使用外部队列?

python python-3.x multiprocessing python-multiprocessing concurrent.futures
1个回答
0
投票

正如我的问题中提到的,ProcessPoolExecutor具有自己的内部队列,该队列不受限制。但是,ProcessPoolExecutor._queue_count计算活动请求的数量(运行+待处理)。

对我来说是有界的,只需简单地在ProcessPoolExecutor之上创建一个包装器,以检查计数器并在数量超出所需的最大队列大小时抛出一些运行时异常:

    self._max_queue_size = self._max_workers + max_queue_size 

然后:

def submit(self, fn, *args, **kwargs) -> Future:
    if self._executor._queue_count >= self._max_queue_size:
        raise RuntimeError(
            f"{self.__class__.__name__} has reached its maximum of "
            f"{self._max_queue_size} active (running + queued) requests.")
    return self._executor.submit(fn, *args, **kwargs)

可能不是最好或最干净的解决方案,但肯定对我有用。

© www.soinside.com 2019 - 2024. All rights reserved.