我有一个无法更改的同步生成器函数(因为它在库中),它本身接受同步迭代。
但我想从异步上下文中使用它,向其传递异步可迭代对象,并且在迭代期间不阻塞事件循环。我该怎么做?
例如,假设我们有
my_gen
,它采用一个可迭代的整数,旨在在同步上下文中工作。
sync_input_iterable = range(0, 10000)
sync_output_iterable = my_gen(input_iterable)
for v in output_iterable:
print(v)
但我想这样使用它:
async def main():
# Just for example purposes - there is no "arange" built into Python
async def arange(start, end):
for i in range(start, end):
yield(i)
await asyncio.sleep(0)
# An example generator function we can't change that accepts an iterable
def my_gen(iterable):
for a in iterable:
yield a * 2
async_input_iterable = arange(0, 10000)
async_output_iterable = # Something to to with `my_gen`
async for v in async_output_iterable:
print(v)
asyncio.run(main())
要使这项工作成功,
# Something...
可以做什么?
# Something...
可以有两个组件
to_sync_iter
函数,用于将输入异步迭代转换为同步迭代以传递给生成器函数to_async_iter
将生成器函数的输出转换为异步迭代import asyncio
def to_sync_iter(async_iter, loop):
async_it = aiter(async_iter)
while True:
try:
yield asyncio.run_coroutine_threadsafe(anext(async_it), loop).result()
except StopAsyncIteration:
break
async def to_async_iter(sync_iter):
it = iter(sync_iter)
done = object()
def safe_next():
# Converts StopIteration to a sentinal value to avoid:
# TypeError: StopIteration interacts badly with generators and cannot be raised into a Future
try:
return next(it)
except StopIteration:
return done
while True:
value = await asyncio.to_thread(safe_next)
if value is done:
break
yield value
请注意,这不会在迭代时阻止事件循环 - 同步代码在线程中运行。
例如如下:
async def main():
async def arange(start, end):
for i in range(start, end):
yield(i)
await asyncio.sleep(0)
def my_gen(iterable):
for a in iterable:
yield a * 2
async_input_iterable = arange(0, 10000)
sync_input_iterable = to_sync_iter(input_iterable, asyncio.get_running_loop())
sync_ouput_iterable = my_gen(sync_input_iterable)
async_output_iterable = to_async_iter(sync_ouput_iterable)
async for v in async_output_iterable:
print(v)
asyncio.run(main())
这受到 https://github.com/uktrade/stream-zip/issues/87#issuecomment-1695123135 中代码的启发,该代码执行类似于包装stream-zip的操作以使其在异步上下文中工作。
我真的无法评价它的性能。