我有一个无法更改的同步生成器函数(因为它在库中),它本身接受同步迭代。
但我想从异步上下文中使用它,向其传递异步可迭代对象,并且在迭代期间不阻塞事件循环。我该怎么做?
例如,假设我们有
my_gen
,它采用一个可迭代的整数,旨在在同步上下文中工作。
input_iterable = range(0, 10000)
output_iterable = my_gen(input_iterable)
for v in output_iterable:
print(v)
但我想这样使用它:
async def main():
# Just for example purposes - there is no "arange" built into Python
async def arange(start, end):
for i in range(start, end):
yield(i)
await asyncio.sleep(0)
# An example generator function we can't change that accepts an iterable
def my_gen(iterable):
for a in iterable:
yield a * 2
input_iterable = arange(0, 10000)
output_iterable = # Something to to with `my_gen`
async for v in ouput_iterable:
print(v)
asyncio.run(main())
要使这项工作成功,
# Something...
可以做什么?
# Something...
可以使用以下 to_async_gen
函数来包装同步生成器函数,使其在异步上下文中工作。它假设它的第一个参数是同步迭代。
import asyncio
async def to_async_gen(sync_gen, async_iter, *args, **kwargs):
def sync_iterable(loop, async_iterable):
async_it = aiter(async_iterable)
while True:
try:
yield asyncio.run_coroutine_threadsafe(anext(async_it), loop).result()
except StopAsyncIteration:
break
def to_thread_safe_next(it, done):
try:
return next(it)
except StopIteration:
return done
loop = asyncio.get_event_loop()
it = iter(sync_gen(sync_iterable(loop, async_iter), *args, **kwargs))
done = object()
while True:
value = await asyncio.to_thread(to_thread_safe_next, it, done)
if value is done:
break
yield value
使用如下
async def main():
async def arange(start, end):
for i in range(start, end):
yield(i)
await asyncio.sleep(0)
def my_gen(iterable):
for a in iterable:
yield a * 2
input_iterable = arange(0, 10000)
output_iterable = to_async_gen(my_gen, input_iterable)
async for v in output_iterable:
print(v)
asyncio.run(main())
这受到 https://github.com/uktrade/stream-zip/issues/87#issuecomment-1695123135 中代码的启发,该代码执行类似于包装stream-zip的操作以使其在异步上下文中工作。
我真的无法评价它的性能。