我想并行运行的功能多files.and希望所有的人都点之前终止。
对于实施例:有一个环
def main(): for item in list: function_x(item) function_y(list)
现在我想的是,这function_x应该在所有项目并行运行。但是,这个功能应该可以完成所有的项目我的function_y被调用之前。我打算用芹菜这一点。但不知道如何做到这一点。
这是我最后的测试代码。
所有我需要做的是使用多库。
from multiprocessing import Process
from time import sleep
Pros = []
def function_x(i):
for j in range(0,5):
sleep(3)
print i
def function_y():
print "done"
def main():
for i in range(0,3):
print "Thread Started"
p = Process(target=function_x, args=(i,))
Pros.append(p)
p.start()
# block until all the threads finish (i.e. block until all function_x calls finish)
for t in Pros:
t.join()
function_y()
你可以使用线程这一点。 thread.join
是你需要的功能,该功能块直到线程完成。
你可以这样做:
import threading
threads = []
def main():
for item in list:
t = threading.Thread(target=function_x, args=(item,))
threads.append(t)
t.start()
# block until all the threads finish (i.e. until all function_a functions finish)
for t in threads:
t.join()
function_y(list)
你可以用Ray,这对于编写并行和分布式的Python库典雅的做到这一点。
简单地声明与function_x
的@ray.remote
,然后它可以并行通过用function_x.remote
调用它被执行,并且结果可与ray.get
检索。
import ray
import time
ray.init()
@ray.remote
def function_x(item):
time.sleep(1)
return item
def function_y(list):
pass
list = [1, 2, 3, 4]
# Process the items in parallel.
results = ray.get([function_x.remote(item) for item in list])
function_y(list)
Here is the documentation for celery groups,这是我想你想要的。使用AsyncResult.get()
代替AsyncResult.ready()
的阻拦。
#!/bin/env python
import concurrent.futures
def function_x(item):
return item * item
def function_y(lst):
return [x * x for x in lst]
a_list = range(10)
if __name__ == '__main__':
with concurrent.futures.ThreadPoolExecutor(10) as tp:
future_to_function_x = {
tp.submit(function_x, item): item
for item in a_list
}
results = {}
for future in concurrent.futures.as_completed(future_to_function_x):
item = future_to_function_x[future]
try:
res = future.result()
except Exception as e:
print('Exception when processing item "%s": %s' % (item, e))
else:
results[item] = res
print('results:', results)
after = function_y(results.values())
print('after:', after)
输出:
results: {0: 0, 1: 1, 2: 4, 3: 9, 4: 16, 5: 25, 6: 36, 7: 49, 8: 64, 9: 81}
after: [0, 1, 16, 81, 256, 625, 1296, 2401, 4096, 6561]