我可以退出“as_completed”池执行器循环(取消所有剩余任务)吗?

问题描述 投票:0回答:1

其想法是运行一组任务,在完成后处理它们,一旦返回所需的结果,就取消剩余的任务。我不介意等待已经开始的任务完成,但应该禁止开始新的任务。

我尝试过拨打

pool.shutdown(cancel_futures=True)
,但这里似乎不起作用:

import time
import multiprocessing
import concurrent.futures
import random

filelist = ['001', '002', '003', '004', '005', '006', '007', '008', '009', '010']
parallel = 2

def worker(name, lock, index):
    with lock:
        index.value += 1
        pc = index.value*10
        print(f'starting: {name} ({pc})')
    d = random.randrange(1, 5)
    time.sleep(d)
    return pc == 60

print('INIT')

with multiprocessing.Manager() as manager:
    lock = manager.Lock()
    index = manager.Value('b', 0)
    if parallel > 0:
        with concurrent.futures.ProcessPoolExecutor(max_workers=parallel) as pool:
            tasks = [pool.submit(worker, filename, lock, index) for filename in filelist]
            for task in concurrent.futures.as_completed(tasks):
                rc = task.result()
                if rc:
                    print('Found!')
                    pool.shutdown(cancel_futures=True)
                    break
      else:
          for filename in filelist:
              rc = worker(filename, lock, index)
              if rc:
                  print('Found!')
                  break

print('END')

以串行模式运行时(

parallel = 0
),我得到了期望的结果:

INIT
starting: 001 (10)
starting: 002 (20)
starting: 003 (30)
starting: 004 (40)
starting: 005 (50)
starting: 006 (60)
Found!
END

但是激活并行后,它只会继续到最后:

INIT
starting: 001 (10)
starting: 002 (20)
starting: 003 (30)
starting: 004 (40)
starting: 005 (50)
starting: 006 (60)
Found!
starting: 007 (70)
starting: 008 (80)
starting: 009 (90)
END

那么,有没有办法退出

as_completed
循环呢?

python python-multiprocessing concurrent.futures
1个回答
0
投票

是的。例如,您可以使用

Pool
模块中的
multiprocessing
类来完成此操作 示例:

from functools import partial
import multiprocessing as mp
import time


def child_process(x, stop_event):
    # stopping condition
    if x == 4:
        print('stop event is set')
        stop_event.set()
    # do something else
    time.sleep(1)


if __name__ == "__main__":
    print("main process start")

    # Create exit event

    # Создание пула процессов
    pool = mp.Pool(2)
    print('pool started')
    start = time.perf_counter()
    pool.map_async(partial(child_process, stop_event=exit_event), range(20))
    while 1:
        if exit_event.is_set():
            # close pool
            pool.terminate()
            pool.join()
            break
    print(f'pool interrupted. Time took: {time.perf_counter() - start:.2f} s.')
    # do something

出:

main process start
pool started
stop event is set
pool interrupted. Time took: 1.17 s.
© www.soinside.com 2019 - 2024. All rights reserved.