Multiprocessing Process Pool Executor 阻塞提交功能

问题描述 投票:0回答:1

我正在尝试将我的

executor.submit
函数调用转换为阻塞函数,以便它一直等到 ProcessPoolExecutor 池有可用的工作人员。

换句话说,最初它应该打印 1,2,然后等待 5 秒,打印 3,4,等等。

我怎样才能做到这一点?

import concurrent.futures
import multiprocessing
import time

def wait_f():
    time.sleep(5)
    return 1



if __name__ == '__main__':
    multiprocessing.freeze_support()
    global_results = []
    with concurrent.futures.ProcessPoolExecutor(max_workers=2) as executor:
        futures = []
        for j in range(10):
            future = executor.submit(wait_f)
            futures.append(future)
            print(j)
        for future in concurrent.futures.as_completed(futures):
            result = future.result()
            global_results.append(result)
python process multiprocessing blocking
1个回答
0
投票

请记住,工作池模式旨在为您处理并发,因此您不必这样做。换句话说,你不必担心安排的任务比工作人员多,因为 Pool 已经为你优化了任务流程。

这就是说,您可以从 this 回答一个可行的解决方案。

from threading import Semaphore
from concurrent.futures import ProcessPoolExecutor

class TaskManager():
    def __init__(self, processes):
        self.workers = Semaphore(processes)
        self.executor = ProcessPoolExecutor(max_workers=processes)

    def new_task(self, function):
        """Start a new task, blocks if queue is full."""
        self.workers.acquire()
        future = self.executor.submit(function)
        future.add_done_callback(self.task_done)

    def task_done(self):
        """Called once task is done, releases the queue if blocked."""
        self.workers.release()


task_manager = TaskManager(2)
task_manager.new_task(wait_f)
© www.soinside.com 2019 - 2024. All rights reserved.