使用 TaskGroups 迭代聚合结果时,我收到 StopIteration 错误,但其他情况下则不会。我创建了一个最小的示例来显示错误:
import asyncio
from motor.motor_asyncio import AsyncIOMotorClient
def create_aggregate():
client = AsyncIOMotorClient("mongodb://localhost:27017")
collection = client["test"]["test"]
return collection.aggregate([])
async def get_next(agg):
item = await agg.next()
print(item)
async def run_1_success():
agg = create_aggregate()
await get_next(agg)
async def run_2_success():
agg = create_aggregate()
await get_next(agg)
async with asyncio.TaskGroup() as tg:
tg.create_task(get_next(agg))
tg.create_task(get_next(agg))
async def run_3_error():
agg = create_aggregate()
async with asyncio.TaskGroup() as tg:
tg.create_task(get_next(agg))
tg.create_task(get_next(agg))
运行
asyncio.run(run_1_success())
或 asyncio.run(run_2_success())
时,即使使用 TaskGroup,我也可以成功地迭代简单的聚合。但是在运行 asyncio.run(run_3_error())
时,我收到了 StopAsyncIteration 错误。
环境:
谢谢你。
MotorCollection.aggregate
,强调我的:
,它在第一次迭代时延迟运行聚合命令。为了使用MotorCommandCursor
$out
或$merge
运行聚合,应用程序需要迭代光标。 在
run_2_success
中,有一个初始的
await get_next(agg)
,执行聚合并等待光标返回。因此,可以在 TaskGroups with
块中迭代光标。但是在 run_3_error
中,由于两个任务具有相同的
agg
聚合对象,因此它还没有准备好 next
- 而第一个任务仍在等待光标返回,而第二个任务已经在尝试迭代在光标上方(或者在任务执行时反之亦然)。如果我在 await asyncio.sleep(1)
中的两个任务之间添加
run_3_error
,那么它就可以工作。此外,您不需要 get_next(agg)
方法。
MotorCollection.aggregate
对象
返回一个 ,可以像光标一样从find()
:进行迭代 旁注:您不应该尝试并行迭代“一个游标”的结果。如果你想对结果进行异步操作,它应该是这样的:
async def do_something(res):
print(f"doing something with: {res["item"]}")
await asyncio.sleep(.1)
print(f"done with: {res["item"]}")
async def run_4_okay():
agg = create_aggregate()
async with asyncio.TaskGroup() as tg:
async for result in agg:
tg.create_task(do_something(result))
(将它们附加到
tasks
列表以便稍后跟踪结果)
输出,
使用此MongoDB 文档集合