存储结果ThreadPoolExecutor

问题描述 投票:1回答:1

我对“concurrent.futures”并行处理相当新,我正在测试一些简单的实验。我写的代码似乎有效,但我不确定如何存储结果。我试图创建一个列表(“期货”)并将结果附加到该列表,但这大大减慢了程序。我想知道是否有更好的方法来做到这一点。谢谢。

import concurrent.futures
import time

couple_ods= []
futures=[]

dtab={}
for i in range(100):
    for j in range(100):
       dtab[i,j]=i+j/2
       couple_ods.append((i,j))

avg_speed=100
def task(i):
    origin=i[0]
    destination=i[1]
    time.sleep(0.01)
    distance=dtab[origin,destination]/avg_speed
    return distance
start1=time.time()
def main():
    with concurrent.futures.ThreadPoolExecutor() as executor:
       for number in couple_ods:
          future=executor.submit(task,number)
          futures.append(future.result())

if __name__ == '__main__':
    main()
end1=time.time()
python python-multithreading concurrent.futures
1个回答
4
投票

当你调用future.result()时,它会一直阻塞,直到值准备就绪。所以,你没有从并行性中获得任何好处 - 你开始一项任务,等待它完成,启动另一项任务,等待它完成,等等。

当然,您的示例首先不会受益于线程。你的任务除了CPU绑定的Python计算之外什么都不做,这意味着(至少在CPython,MicroPython和PyPy中,这是concurrent.futures附带的唯一完整实现),GIL(全局解释器锁)将阻止不止一个你的线程一次进展。

希望你真正的计划是不同的。如果它正在进行I / O绑定的东西(发出网络请求,读取文件等),或者使用像NumPy这样的扩展库来释放围绕繁重的CPU工作的GIL,那么它将正常工作。但除此之外,你会想在这里使用ProcessPoolExecutor


无论如何,你想要做的是将future本身附加到一个列表中,这样你就可以在等待其中任何一个之前得到所有期货的清单:

for number in couple_ods:
    future=executor.submit(task,number)
    futures.append(future)

然后,在您开始所有工作后,您可以开始等待它们。当您需要更多控制时,有三个简单的选项,一个很复杂。


(1)你可以直接循环遍历它们,按照它们提交的顺序等待它们:

for future in futures:
    result = future.result()
    dostuff(result)

(2)如果你需要在做任何工作之前等待它们全部完成,你可以打电话给wait

futures, _ = concurrent.futures.wait(futures)
for future in futures:
    result = future.result()
    dostuff(result)

(3)如果你想在每个人准备好后立即处理它们,即使它们出现故障,也要使用as_completed

for result in concurrent.futures.as_completed(futures):
    dostuff(result)

请注意,在文档中使用此函数的示例提供了一些方法来标识已完成的任务。如果你需要它,它可以像传递每个索引一样简单,然后return index, real_result,然后你可以for index, result in …循环。

(4)如果你需要更多的控制,你可以循环waiting到目前为止做的任何事情:

while futures:
    done, futures = concurrent.futures.wait(concurrent.futures.FIRST_COMPLETED)
    for future in done:
        result = future.result()
        dostuff(result)

这个例子与as_completed做同样的事情,但是你可以在它上面做一些小变化来做不同的事情,比如等待一切都要完成但是如果有什么事情引发异常就提前取消。


对于许多简单的情况,您可以使用执行程序的map方法来简化第一个选项。这就像内置的map函数一样,为参数中的每个值调用一次函数,然后给你一些你可以循环以获得相同顺序的结果的东西,但它并行执行。所以:

for result in executor.map(task, couple_ods):
    dostuff(result)
© www.soinside.com 2019 - 2024. All rights reserved.