在我的代码库中,我有一个函数,它在资源上返回
AsyncIterator
。如果我异步请求每个节点,我们可以将此资源视为一个链表。我想使用异步函数处理每个节点,并表示我可以在等待当前元素处理时获取下一个元素。
举个例子,考虑以下函数
async def _get_next(prev: int) -> int:
print(f"Fetching element {prev+1}")
await asyncio.sleep(1)
return prev + 1
async def fetch_elements(n: int) -> AsyncIterable[int]:
curr = 0
for _ in range(n):
prev = curr
curr = _get_next(prev)
print(f"Yielding element {curr}")
yield curr
async def process_element(e: int) -> None:
print(f"Pushing element {e}")
await asyncio.sleep(1)
print(f"Element {e} pushed")
我的印象是我可以用
async for
实现这一目标,但事实似乎并非如此,就好像我跑步一样
async for elem in fetch_elements(n):
await push_element(elem)
我必须等待
push_element
完成才能开始获取下一个元素。有没有一种简单的方法可以实现这一点,或者我必须取消AsyncIterator
?
你想要类似的东西:
async def fetch_elements(n: int):
curr = 0
for _ in range(n - 1):
task = asyncio.create_task(_get_next(curr))
yield curr
curr = await task
yield curr
您希望在返回当前元素之前启动一项单独的任务来获取下一个元素。但在循环的最后一次迭代中,您必须小心不要获取下一个元素。
更新以专门处理最后一次迭代。