是否可以使用
ThreadPoolExecutor
中的 concurrent.futures
从生成器执行交错到正常方法执行。
import concurrent.futures
from app import executor
def generator():
for val in external_api():
yield val
def func1():
return 1
def func2():
return 2
def stream():
generator_future = executor.submit(generator)
future1 = executor.submit(func1)
future2 = executor.submit(func2)
for future in concurrent.futures.as_completed([generator_future, future1, future2]):
if future == generator_future:
generator_result = future.result()
for value in generator_result:
yield value
else:
yield future.result()
如果开始执行生成器,即使
stream()
或func1
完成,也无法在func2
中产生正常的方法结果。正常方法结果将被保留,直到生成器屈服完成。在生成器屈服期间是否可以产生 func1
和 func2
结果。
注意:在 Flask 应用程序中使用上面的代码片段以及 Flask-Executor 包装器来将应用程序上下文推送到线程。
当您调用生成器函数时,我不关心它的内部有多复杂或生成其所有值需要多长时间,它几乎立即返回一个迭代器
当迭代时执行生成器函数中的代码以获取使用ThreadPoolExecutor
语句生成的所有元素。
下面演示了多线程池返回的是
yield
(但您应该知道,因为您正在迭代此结果):
Iterator
打印:
import concurrent.futures
from collections.abc import Iterator
def generator():
import time
# Simulate an api call:
time.sleep(.5)
for val in range(3, 9):
yield val
def func1():
return 1
def func2():
return 2
def stream():
executor = concurrent.futures.ThreadPoolExecutor(3)
generator_future = executor.submit(generator)
future1 = executor.submit(func1)
future2 = executor.submit(func2)
for future in concurrent.futures.as_completed([generator_future, future1, future2]):
if future == generator_future:
generator_result = future.result()
# Demonstrate that what is returned from calling
# generator() is an iterator:
print(isinstance(generator_result, Iterator))
for value in generator_result:
yield value
else:
yield future.result()
if __name__ == '__main__':
print(list(stream()))
因此,在三个提交的任务中,这三个任务都相对琐碎,第一个提交的任务首先完成,这个任务是对生成器函数的调用。但随后您立即开始迭代此迭代器,这会导致调用您的(可能非常重要的)外部 api,该 API 返回将“产生”的多个值。在检索到所有产生的值之前,您不会检查其他两个提交到池中的任务的完成情况,这将阻止获得任何其他结果。为什么要使用生成器函数?
这意味着对外部 api 的实际调用将由您的主线程执行,而不是在池线程之一中执行。要将 api 调用与其他任务提交重叠,请尝试:
True
[3, 4, 5, 6, 7, 8, 1, 2]
打印:
import concurrent.futures
from collections.abc import Iterable
def call_external_api():
import time
# Simulate an api call:
time.sleep(.5)
# Results from the call to the api:
return [3, 4, 5, 6, 7, 8]
def func1():
return 1
def func2():
return 2
def stream():
executor = concurrent.futures.ThreadPoolExecutor(3)
futures = [
executor.submit(call_external_api),
executor.submit(func1),
executor.submit(func2)
]
for future in concurrent.futures.as_completed(futures):
result = future.result()
if isinstance(result, Iterable):
for elem in result:
yield elem
else:
yield result
if __name__ == '__main__':
print(list(stream()))