import multiprocessing as mp
pool = mp.Pool()
def calc(i):
return i * 2
def done(results):
for result in results:
print(result)
def loop():
pool.map_async(calc, [0, 1, 2, 3], callback = done):
while True:
loop()
我正在使用一个线程池设置,如这个简化示例中所述。它按预期工作:主循环要求池在每次运行时通过
map_async
计算一组结果,池使用 calc
函数来处理给定的项目,准备好后,将调用 done
函数包含所有结果的列表,在本例中始终为 [0, 2, 4, 6]
。
我想要实现的目标:我希望线程池在任何结果完成处理时运行回调函数,或者将其添加到主线程可以随时查看和修改的列表中。在计算出最后一个结果后,池子停止调用回调,直到主循环再次触发它并重复该过程。
目标是让工作线程尽可能快地发布处理后的结果,让主线程在执行时找到尽可能多的结果,即使它们还没有准备好。使用
result.get()
不是一个选项,因为获取结果不得阻塞主线程,如果您想在同步模式下运行,这将作为一个可选设置。我可以使用池中定义的回调,或者将每个完成的结果添加到数组中,只要主循环可以看到并处理其中的完成项目,然后将其丢弃。
值得注意的是,主线程并不强制将数组交给线程池,
[0, 1, 2, 3]
是类中的常量变量,但calc
函数工作的其他变量需要在每次调用时从主线程更新。仍然将一个项目分配给每个线程...例如,我的数组假设有 4 个进程,每个进程的任务是计算其中一个数字。我不希望它完全按照我的想象工作,但请让我知道最接近的。
map_async
的回调函数仅在所有结果准备就绪时调用。
如果您希望函数返回后立即调用回调函数,您可以迭代地将作业提交到
apply_async
:
import multiprocessing as mp
def calc(i):
return i * 2
def done(result):
print(result)
def main():
pool = mp.Pool()
while True:
for i in [0, 1, 2, 3]:
pool.apply_async(calc, (i,), callback=done)
if __name__ == '__main__':
main()