如何将Python生成器转换为异步生成器?

问题描述 投票:0回答:1

我在 Python 中有一个生成器函数,它是 IO 绑定的。我想将其转换为异步生成器,其中生成器循环在单独的进程或线程中运行。最好,我想使用并发.futures 模块来允许在单独的进程与线程之间灵活选择。解决这个问题的最佳方法是什么?

import time

def blocking():
    """ plain generator with blocking i/o """
    for i in range(10):
        time.sleep(1)
        yield i

def consumer():
    for i in blocking():
        print(i)
python multithreading concurrency python-asyncio generator
1个回答
1
投票

是的,不幸的是,这并不简单。

下面的代码片段中的可选内容是“任务组”,但它有助于良好实践:

import time

import asyncio
from concurrent.futures import ThreadPoolExecutor # (or Process*)

executor = ThreadPoolExecutor(10)  # Put this wherever you like, with as many workers as you want.

def blocking_task(i):
    time.sleep(1)
    return i

async def async_gen():
    """ plain generator with blocking i/o """
    async with asyncio.TaskGroup() as tg:
        loop = asyncio.get_running_loop()
        async def taskwrapper(awaitable):
            return await awaitable
        tasks = {tg.create_task(taskwrapper(loop.run_in_executor(executor, blocking_task, i,))) for i in range(10)}
        while tasks:
            done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
            for task in done:
                yield task.result()
            tasks = pending


async def consumer():
    async for i in async_gen():
        print(i)


asyncio.run(consumer())
© www.soinside.com 2019 - 2024. All rights reserved.