我有一个线程对象,我无法在ProcessPoolExecutor中分发该对象,但希望返回将来。如果我已经有了一个未来,是否有办法适用于它的完成值,例如Future a -> (a -> b) -> Future b
?
import concurrent.futures
import threading
def three(x):
return 2+x
if __name__ == '__main__':
trackedItem = (3, threading.Event())
pool = concurrent.futures.ProcessPoolExecutor(3)
poolJob = (q.submit(three, trackedItem[0]),trackedItem[1]) #(Future(int), Event)
*** something magic goes here ***
#Trying to transform it into Future(int,Event)
这是使用更简单的设置代码的一种方式,没有threading.Event
,因为解决该问题似乎没有必要。基本上,您可以自己将future_b
创建为新的Future()
,并使用add_done_callback
上的future_a
方法设置future_b
的结果。这里,func_a
是计算future_a
的结果的计算,func_b
是使用future_b
的结果计算future_a
的结果的计算。]
from concurrent.futures import ProcessPoolExecutor, Future def func_a(x): return 2 + x def func_b(x): return 10 * x if __name__ == '__main__': pool = ProcessPoolExecutor(3) future_a = pool.submit(func_a, 3) future_b = Future() future_b.set_running_or_notify_cancel() def callback(f): x = f.result() y = func_b(x) future_b.set_result(y) future_a.add_done_callback(callback) print(future_b.result()) # 50
如果要帮助函数执行此操作,可以编写一个:
map_future
接受一个Future和一个映射函数,并根据需要返回新的映射的Future:
def map_future(future_a, func): future_b = Future() future_b.set_running_or_notify_cancel() def callback(f): x = f.result() y = func(x) future_b.set_result(y) future_a.add_done_callback(callback) return future_b
注意:这与
Future
类的文档中的建议背道而驰,它说:
[
Future
实例是由Executor.submit()
创建的,除测试外,不应直接创建。而且,这不会检查
func_a
是否引发异常。如果确实如此,则f.result()
将在回调中引发该异常,并且根据文档,该异常将被“记录并忽略”。因此,您可能需要添加一些异常处理以使其更加健壮。