我对“concurrent.futures”并行处理相当新,我正在测试一些简单的实验。我写的代码似乎有效,但我不确定如何存储结果。我试图创建一个列表(“期货”)并将结果附加到该列表,但这大大减慢了程序。我想知道是否有更好的方法来做到这一点。谢谢。
import concurrent.futures
import time
couple_ods= []
futures=[]
dtab={}
for i in range(100):
for j in range(100):
dtab[i,j]=i+j/2
couple_ods.append((i,j))
avg_speed=100
def task(i):
origin=i[0]
destination=i[1]
time.sleep(0.01)
distance=dtab[origin,destination]/avg_speed
return distance
start1=time.time()
def main():
with concurrent.futures.ThreadPoolExecutor() as executor:
for number in couple_ods:
future=executor.submit(task,number)
futures.append(future.result())
if __name__ == '__main__':
main()
end1=time.time()
当你调用future.result()
时,它会一直阻塞,直到值准备就绪。所以,你没有从并行性中获得任何好处 - 你开始一项任务,等待它完成,启动另一项任务,等待它完成,等等。
当然,您的示例首先不会受益于线程。你的任务除了CPU绑定的Python计算之外什么都不做,这意味着(至少在CPython,MicroPython和PyPy中,这是concurrent.futures
附带的唯一完整实现),GIL(全局解释器锁)将阻止不止一个你的线程一次进展。
希望你真正的计划是不同的。如果它正在进行I / O绑定的东西(发出网络请求,读取文件等),或者使用像NumPy这样的扩展库来释放围绕繁重的CPU工作的GIL,那么它将正常工作。但除此之外,你会想在这里使用ProcessPoolExecutor
。
无论如何,你想要做的是将future
本身附加到一个列表中,这样你就可以在等待其中任何一个之前得到所有期货的清单:
for number in couple_ods:
future=executor.submit(task,number)
futures.append(future)
然后,在您开始所有工作后,您可以开始等待它们。当您需要更多控制时,有三个简单的选项,一个很复杂。
(1)你可以直接循环遍历它们,按照它们提交的顺序等待它们:
for future in futures:
result = future.result()
dostuff(result)
(2)如果你需要在做任何工作之前等待它们全部完成,你可以打电话给wait
:
futures, _ = concurrent.futures.wait(futures)
for future in futures:
result = future.result()
dostuff(result)
(3)如果你想在每个人准备好后立即处理它们,即使它们出现故障,也要使用as_completed
:
for result in concurrent.futures.as_completed(futures):
dostuff(result)
请注意,在文档中使用此函数的示例提供了一些方法来标识已完成的任务。如果你需要它,它可以像传递每个索引一样简单,然后return index, real_result
,然后你可以for index, result in …
循环。
(4)如果你需要更多的控制,你可以循环wait
ing到目前为止做的任何事情:
while futures:
done, futures = concurrent.futures.wait(concurrent.futures.FIRST_COMPLETED)
for future in done:
result = future.result()
dostuff(result)
这个例子与as_completed
做同样的事情,但是你可以在它上面做一些小变化来做不同的事情,比如等待一切都要完成但是如果有什么事情引发异常就提前取消。
对于许多简单的情况,您可以使用执行程序的map
方法来简化第一个选项。这就像内置的map
函数一样,为参数中的每个值调用一次函数,然后给你一些你可以循环以获得相同顺序的结果的东西,但它并行执行。所以:
for result in executor.map(task, couple_ods):
dostuff(result)