我需要将一个函数并行应用于大型生成器中的所有项目。我不想缓冲回报,它只是一个巨大的None
阵列。
我不明白如何使用concurrent.future
API执行此操作。
将函数应用于生成器中的项目,通过具有工作程序上限的线程池,丢弃结果,这是最简单,最有效的方法是什么?
好的,这就是我做的。鉴于以下定义:
from concurrent import futures
import itertools
func = print # any function
tasks = iter(iterable) # any one-time-only iterator
workers = 10
这将func
映射到tasks
,带有工人上限,丢弃结果:
with futures.ThreadPoolExecutor(max_workers = workers) as pool:
initial = itertools.islice(tasks, workers)
running = set(pool.submit(func, task) for task in initial)
for task in tasks:
done, _ = futures.wait(running, return_when = futures.FIRST_COMPLETED)
running -= done
running.add(pool.submit(func, task))
如果您希望扩展迭代就像状态机的多个对象的实现,您可以使用我编写的程序。
2个背景。一个文件读取任务(2个不同的对象代表2个状态机)一个上下文保存线程池,任何时候未来完成它使用线程安全队列来通知它已完成,这会导致next()要调用的方法并继续下一次迭代。
当对象达到“完成”状态时,它不再返回池中。