将采用同步可迭代的同步生成器函数转换为采用异步可迭代的异步生成器函数

问题描述 投票:0回答:1

我有一个无法更改的同步生成器函数(因为它在库中),它本身接受同步迭代。

但我想从异步上下文中使用它,向其传递异步可迭代对象,并且在迭代期间不阻塞事件循环。我该怎么做?

例如,假设我们有

my_gen
,它采用一个可迭代的整数,旨在在同步上下文中工作。

sync_input_iterable = range(0, 10000)
sync_output_iterable = my_gen(input_iterable)
for v in output_iterable:
    print(v)

但我想这样使用它:

async def main():
   # Just for example purposes - there is no "arange" built into Python
   async def arange(start, end):
        for i in range(start, end):
            yield(i)
            await asyncio.sleep(0)

   # An example generator function we can't change that accepts an iterable
   def my_gen(iterable):
       for a in iterable:
           yield a * 2

   async_input_iterable = arange(0, 10000)
   async_output_iterable = # Something to to with `my_gen`
   async for v in async_output_iterable:
       print(v)

asyncio.run(main())

要使这项工作成功,

# Something...
可以做什么?

python python-asyncio generator
1个回答
0
投票

# Something...
可以有两个组件

  1. 一个
    to_sync_iter
    函数,用于将输入异步迭代转换为同步迭代以传递给生成器函数
  2. A
    to_async_iter
    将生成器函数的输出转换为异步迭代
import asyncio

def to_sync_iter(async_iter, loop):
    async_it = aiter(async_iter)
    while True:
        try:
            yield asyncio.run_coroutine_threadsafe(anext(async_it), loop).result()
        except StopAsyncIteration:
            break

async def to_async_iter(sync_iter):
    it = iter(sync_iter)
    done = object()

    def safe_next():
        # Converts StopIteration to a sentinal value to avoid:
        # TypeError: StopIteration interacts badly with generators and cannot be raised into a Future
        try:
            return next(it)
        except StopIteration:
            return done

    while True:
        value =  await asyncio.to_thread(safe_next)
        if value is done:
            break
        yield value

请注意,这不会在迭代时阻止事件循环 - 同步代码在线程中运行。

例如如下:

async def main():
    async def arange(start, end):
        for i in range(start, end):
            yield(i)
            await asyncio.sleep(0)

    def my_gen(iterable):
        for a in iterable:
            yield a * 2

    async_input_iterable = arange(0, 10000)
    sync_input_iterable = to_sync_iter(input_iterable, asyncio.get_running_loop())
    sync_ouput_iterable = my_gen(sync_input_iterable)
    async_output_iterable = to_async_iter(sync_ouput_iterable)
    async for v in async_output_iterable:
        print(v)

asyncio.run(main())

这受到 https://github.com/uktrade/stream-zip/issues/87#issuecomment-1695123135 中代码的启发,该代码执行类似于包装stream-zip的操作以使其在异步上下文中工作。

我真的无法评价它的性能。

© www.soinside.com 2019 - 2024. All rights reserved.