将采用同步可迭代的生成器函数转换为采用异步可迭代的异步生成器函数

问题描述 投票:0回答:1

我有一个无法更改的同步生成器函数(因为它在库中),它本身接受同步迭代。

但我想从异步上下文中使用它,向其传递异步可迭代对象,并且在迭代期间不阻塞事件循环。我该怎么做?

例如,假设我们有

my_gen
,它采用一个可迭代的整数,旨在在同步上下文中工作。

input_iterable = range(0, 10000)
output_iterable = my_gen(input_iterable)
for v in output_iterable:
    print(v)

但我想这样使用它:

async def main():
   # Just for example purposes - there is no "arange" built into Python
   async def arange(start, end):
        for i in range(start, end):
            yield(i)
            await asyncio.sleep(0)

   # An example generator function we can't change that accepts an iterable
   def my_gen(iterable):
       for a in iterable:
           yield a * 2

   input_iterable = arange(0, 10000)
   output_iterable = # Something to to with `my_gen`
   async for v in ouput_iterable:
       print(v)

asyncio.run(main())

要使这项工作成功,

# Something...
可以做什么?

python python-asyncio generator
1个回答
0
投票

# Something...
可以使用以下
to_async_gen
函数来包装同步生成器函数,使其在异步上下文中工作。它假设它的第一个参数是同步迭代。

import asyncio

async def to_async_gen(sync_gen, async_iter, *args, **kwargs):

    def sync_iterable(loop, async_iterable):
        async_it = aiter(async_iterable)
        while True:
            try:
                yield asyncio.run_coroutine_threadsafe(anext(async_it), loop).result()
            except StopAsyncIteration:
                break

    def to_thread_safe_next(it, done):
        try:
            return next(it)
        except StopIteration:
            return done

    loop = asyncio.get_event_loop()
    it = iter(sync_gen(sync_iterable(loop, async_iter), *args, **kwargs))
    done = object()

    while True:
        value =  await asyncio.to_thread(to_thread_safe_next, it, done)
        if value is done:
            break
        yield value

使用如下

async def main():
    async def arange(start, end):
        for i in range(start, end):
            yield(i)
            await asyncio.sleep(0)

    def my_gen(iterable):
        for a in iterable:
            yield a * 2

    input_iterable = arange(0, 10000)
    output_iterable = to_async_gen(my_gen, input_iterable)
    async for v in output_iterable:
        print(v)

asyncio.run(main())

这受到 https://github.com/uktrade/stream-zip/issues/87#issuecomment-1695123135 中代码的启发,该代码执行类似于包装stream-zip的操作以使其在异步上下文中工作。

我真的无法评价它的性能。

© www.soinside.com 2019 - 2024. All rights reserved.