异步同步发电机

问题描述 投票:0回答:1

我有以下情况。

  1. 我有一个阻塞的,同步的发电机
  2. 我有一个非阻塞的异步函数。

我想运行阻塞生成器(在 ThreadPool)和 async 事件循环上的函数。如何实现这个目标?

下面的函数只是简单地打印生成器的输出,而不是打印事件循环的输出。sleep 功能。

谢谢

from concurrent.futures import ThreadPoolExecutor

import numpy as np
import asyncio
import time


def f():
    while True:
        r = np.random.randint(0, 3)
        time.sleep(r)
        yield r


async def gen():
    loop = asyncio.get_event_loop()
    executor = ThreadPoolExecutor()
    gen = await loop.run_in_executor(executor, f)
    for item in gen:
        print(item)
        print('Inside generator')


async def sleep():
    while True:
        await asyncio.sleep(1)
        print('Inside async sleep')


async def combine():
    await asyncio.gather(sleep(), gen())


def main():
    loop = asyncio.get_event_loop()
    loop.run_until_complete(combine())


if __name__ == '__main__':
    main()
async-await python-asyncio concurrent.futures
1个回答
2
投票

run_in_executor 不能用于生成器,因为它是为阻塞函数设计的。虽然生成器是一个有效的函数,但它在被调用时立即返回,提供一个对象,调用者应该通过重复调用的 next. (这就是 Python 的 for 循环在引擎盖下的作用。) 要从异步代码中使用阻塞生成器,你有两种选择。

  • 将每个 步骤 的迭代(每个单独的对 next)的单独调用中。run_in_executor
  • 开办 for 循环,并使用队列将对象传输给异步消费者。

这两种方法都可以抽象成一个函数,接受一个迭代器并返回一个等价的异步迭代器。这是第二种方法的实现。

import asyncio, threading

def async_wrap_iter(it):
    """Wrap blocking iterator into an asynchronous one"""
    loop = asyncio.get_event_loop()
    q = asyncio.Queue(1)
    exception = None
    _END = object()

    async def yield_queue_items():
        while True:
            next_item = await q.get()
            if next_item is _END:
                break
            yield next_item
        if exception is not None:
            # the iterator has raised, propagate the exception
            raise exception

    def iter_to_queue():
        nonlocal exception
        try:
            for item in it:
                # This runs outside the event loop thread, so we
                # must use thread-safe API to talk to the queue.
                asyncio.run_coroutine_threadsafe(q.put(item), loop).result()
        except Exception as e:
            exception = e
        finally:
            asyncio.run_coroutine_threadsafe(q.put(_END), loop).result()

    threading.Thread(target=iter_to_queue).start()
    return yield_queue_items()

它可以用一个简单的同步迭代器来测试,它使用的是 time.time() 阻止和一个异步心跳函数来证明事件循环正在运行。

# async_wrap_iter definition as above

import time

def test_iter():
    for i in range(5):
        yield i
        time.sleep(1)

async def test():
    ait = async_wrap_iter(test_iter())
    async for i in ait:
        print(i)

async def heartbeat():
    while True:
        print('alive')
        await asyncio.sleep(.1)

async def main():
    asyncio.create_task(heartbeat())
    await test()

asyncio.run(main())
© www.soinside.com 2019 - 2024. All rights reserved.