如何在功能上构成期货?

问题描述 投票:0回答:1

我有一个线程对象,我无法在ProcessPoolExecutor中分发该对象,但希望返回将来。如果我已经有了一个未来,是否有办法适用于它的完成值,例如Future a -> (a -> b) -> Future b

import concurrent.futures
import threading

def three(x):
    return 2+x


if __name__ == '__main__':
    trackedItem = (3, threading.Event())
    pool = concurrent.futures.ProcessPoolExecutor(3)
    poolJob = (q.submit(three, trackedItem[0]),trackedItem[1]) #(Future(int), Event)
    *** something magic goes here ***
    #Trying to transform it into Future(int,Event)
python future python-multithreading functor concurrent.futures
1个回答
0
投票

这是使用更简单的设置代码的一种方式,没有threading.Event,因为解决该问题似乎没有必要。基本上,您可以自己将future_b创建为新的Future(),并使用add_done_callback上的future_a方法设置future_b的结果。这里,func_a是计算future_a的结果的计算,func_b是使用future_b的结果计算future_a的结果的计算。]

from concurrent.futures import ProcessPoolExecutor, Future

def func_a(x):
    return 2 + x

def func_b(x):
    return 10 * x

if __name__ == '__main__':
    pool = ProcessPoolExecutor(3)
    future_a = pool.submit(func_a, 3)

    future_b = Future()
    future_b.set_running_or_notify_cancel()

    def callback(f):
        x = f.result()
        y = func_b(x)
        future_b.set_result(y)

    future_a.add_done_callback(callback)

    print(future_b.result()) # 50

如果要帮助函数执行此操作,可以编写一个:map_future接受一个Future和一个映射函数,并根据需要返回新的映射的Future:

def map_future(future_a, func):
    future_b = Future()
    future_b.set_running_or_notify_cancel()

    def callback(f):
        x = f.result()
        y = func(x)
        future_b.set_result(y)

    future_a.add_done_callback(callback)
    return future_b

注意:这与Future类的文档中的建议背道而驰,它说:

[Future实例是由Executor.submit()创建的,除测试外,不应直接创建。

而且,这不会检查func_a是否引发异常。如果确实如此,则f.result()将在回调中引发该异常,并且根据文档,该异常将被“记录并忽略”。因此,您可能需要添加一些异常处理以使其更加健壮。

© www.soinside.com 2019 - 2024. All rights reserved.