其想法是运行一组任务,在完成后处理它们,一旦返回所需的结果,就取消剩余的任务。我不介意等待已经开始的任务完成,但应该禁止开始新的任务。
我尝试过拨打
pool.shutdown(cancel_futures=True)
,但这里似乎不起作用:
import time
import multiprocessing
import concurrent.futures
import random
filelist = ['001', '002', '003', '004', '005', '006', '007', '008', '009', '010']
parallel = 2
def worker(name, lock, index):
with lock:
index.value += 1
pc = index.value*10
print(f'starting: {name} ({pc})')
d = random.randrange(1, 5)
time.sleep(d)
return pc == 60
print('INIT')
with multiprocessing.Manager() as manager:
lock = manager.Lock()
index = manager.Value('b', 0)
if parallel > 0:
with concurrent.futures.ProcessPoolExecutor(max_workers=parallel) as pool:
tasks = [pool.submit(worker, filename, lock, index) for filename in filelist]
for task in concurrent.futures.as_completed(tasks):
rc = task.result()
if rc:
print('Found!')
pool.shutdown(cancel_futures=True)
break
else:
for filename in filelist:
rc = worker(filename, lock, index)
if rc:
print('Found!')
break
print('END')
以串行模式运行时(
parallel = 0
),我得到了期望的结果:
INIT
starting: 001 (10)
starting: 002 (20)
starting: 003 (30)
starting: 004 (40)
starting: 005 (50)
starting: 006 (60)
Found!
END
但是激活并行后,它只会继续到最后:
INIT
starting: 001 (10)
starting: 002 (20)
starting: 003 (30)
starting: 004 (40)
starting: 005 (50)
starting: 006 (60)
Found!
starting: 007 (70)
starting: 008 (80)
starting: 009 (90)
END
那么,有没有办法退出
as_completed
循环呢?
是的。例如,您可以使用
Pool
模块中的 multiprocessing
类来完成此操作
示例:
from functools import partial
import multiprocessing as mp
import time
def child_process(x, stop_event):
# stopping condition
if x == 4:
print('stop event is set')
stop_event.set()
# do something else
time.sleep(1)
if __name__ == "__main__":
print("main process start")
# Create exit event
# Создание пула процессов
pool = mp.Pool(2)
print('pool started')
start = time.perf_counter()
pool.map_async(partial(child_process, stop_event=exit_event), range(20))
while 1:
if exit_event.is_set():
# close pool
pool.terminate()
pool.join()
break
print(f'pool interrupted. Time took: {time.perf_counter() - start:.2f} s.')
# do something
出:
main process start
pool started
stop event is set
pool interrupted. Time took: 1.17 s.