生成器执行期间普通Python方法和生成器之间的交错

问题描述 投票:0回答:1

是否可以使用

ThreadPoolExecutor
中的
concurrent.futures
从生成器执行交错到正常方法执行。

import concurrent.futures
from app import executor


def generator():
    for val in external_api():
        yield val


def func1():
    return 1

def func2():
    return 2


def stream():
    generator_future = executor.submit(generator)
    future1 = executor.submit(func1)
    future2 = executor.submit(func2)

    for future in concurrent.futures.as_completed([generator_future, future1, future2]):
        if future == generator_future:
            generator_result = future.result()
            for value in generator_result:
                yield value
        else:
            yield future.result()

如果开始执行生成器,即使

stream()
func1
完成,也无法在
func2
中产生正常的方法结果。正常方法结果将被保留,直到生成器屈服完成。在生成器屈服期间是否可以产生
func1
func2
结果。

注意:在 Flask 应用程序中使用上面的代码片段以及 Flask-Executor 包装器来将应用程序上下文推送到线程。

    

python multithreading flask generator concurrent.futures
1个回答
0
投票

当您调用生成器函数时,我不关心它的内部有多复杂或生成其所有值需要多长时间,它几乎立即返回一个迭代器

当迭代时执行生成器函数中的代码以获取使用

ThreadPoolExecutor 语句生成的所有元素。

 下面演示了多线程池返回的是 
yield(但您应该知道,因为您
正在
迭代此结果): Iterator

打印:

import concurrent.futures from collections.abc import Iterator def generator(): import time # Simulate an api call: time.sleep(.5) for val in range(3, 9): yield val def func1(): return 1 def func2(): return 2 def stream(): executor = concurrent.futures.ThreadPoolExecutor(3) generator_future = executor.submit(generator) future1 = executor.submit(func1) future2 = executor.submit(func2) for future in concurrent.futures.as_completed([generator_future, future1, future2]): if future == generator_future: generator_result = future.result() # Demonstrate that what is returned from calling # generator() is an iterator: print(isinstance(generator_result, Iterator)) for value in generator_result: yield value else: yield future.result() if __name__ == '__main__': print(list(stream()))

因此,在三个提交的任务中,这三个任务都相对琐碎,第一个提交的任务首先完成,这个任务是对生成器函数的调用。但随后您立即开始迭代此迭代器,这会导致调用您的(可能非常重要的)外部 api,该 API 返回将“产生”的多个值。在检索到所有产生的值之前,您不会检查其他两个提交到池中的任务的完成情况,这将阻止获得任何其他结果。

为什么要使用生成器函数?

这意味着对外部 api 的实际调用将由您的主线程执行,而不是在池线程之一中执行。要将 api 调用与其他任务提交重叠,请尝试: True [3, 4, 5, 6, 7, 8, 1, 2]

打印:

import concurrent.futures from collections.abc import Iterable def call_external_api(): import time # Simulate an api call: time.sleep(.5) # Results from the call to the api: return [3, 4, 5, 6, 7, 8] def func1(): return 1 def func2(): return 2 def stream(): executor = concurrent.futures.ThreadPoolExecutor(3) futures = [ executor.submit(call_external_api), executor.submit(func1), executor.submit(func2) ] for future in concurrent.futures.as_completed(futures): result = future.result() if isinstance(result, Iterable): for elem in result: yield elem else: yield result if __name__ == '__main__': print(list(stream()))

© www.soinside.com 2019 - 2024. All rights reserved.