我正在尝试将我的
executor.submit
函数调用转换为阻塞函数,以便它一直等到 ProcessPoolExecutor 池有可用的工作人员。
换句话说,最初它应该打印 1,2,然后等待 5 秒,打印 3,4,等等。
我怎样才能做到这一点?
import concurrent.futures
import multiprocessing
import time
def wait_f():
time.sleep(5)
return 1
if __name__ == '__main__':
multiprocessing.freeze_support()
global_results = []
with concurrent.futures.ProcessPoolExecutor(max_workers=2) as executor:
futures = []
for j in range(10):
future = executor.submit(wait_f)
futures.append(future)
print(j)
for future in concurrent.futures.as_completed(futures):
result = future.result()
global_results.append(result)
请记住,工作池模式旨在为您处理并发,因此您不必这样做。换句话说,你不必担心安排的任务比工作人员多,因为 Pool 已经为你优化了任务流程。
这就是说,您可以从 this 回答一个可行的解决方案。
from threading import Semaphore
from concurrent.futures import ProcessPoolExecutor
class TaskManager():
def __init__(self, processes):
self.workers = Semaphore(processes)
self.executor = ProcessPoolExecutor(max_workers=processes)
def new_task(self, function):
"""Start a new task, blocks if queue is full."""
self.workers.acquire()
future = self.executor.submit(function)
future.add_done_callback(self.task_done)
def task_done(self):
"""Called once task is done, releases the queue if blocked."""
self.workers.release()
task_manager = TaskManager(2)
task_manager.new_task(wait_f)