将 TaskGroup 与 Motor Aggregate 一起使用时出现空迭代器

问题描述 投票:0回答:1

使用 TaskGroups 迭代聚合结果时,我收到 StopIteration 错误,但其他情况下则不会。我创建了一个最小的示例来显示错误:

import asyncio

from motor.motor_asyncio import AsyncIOMotorClient


def create_aggregate():
    client = AsyncIOMotorClient("mongodb://localhost:27017")
    collection = client["test"]["test"]

    return collection.aggregate([])

async def get_next(agg):
    item = await agg.next()
    print(item)

async def run_1_success():
    agg = create_aggregate()
    
    await get_next(agg)

async def run_2_success():
    agg = create_aggregate()
    await get_next(agg)

    async with asyncio.TaskGroup() as tg:
        tg.create_task(get_next(agg))
        tg.create_task(get_next(agg))

async def run_3_error():
    agg = create_aggregate()

    async with asyncio.TaskGroup() as tg:
        tg.create_task(get_next(agg))
        tg.create_task(get_next(agg))

运行

asyncio.run(run_1_success())
asyncio.run(run_2_success())
时,即使使用 TaskGroup,我也可以成功地迭代简单的聚合。但是在运行
asyncio.run(run_3_error())
时,我收到了 StopAsyncIteration 错误。

环境:

  • Python 3.12.2(tags/v3.12.2:6abddd9,2024 年 2 月 6 日,21:26:36)[MSC v.1937 64 位 (AMD64)]
  • 电机3.4.0
  • PyMongo 4.6.2
  • Windows 10 64 位。

谢谢你。

mongodb python-asyncio tornado-motor
1个回答
0
投票

从文档中,对于

MotorCollection.aggregate
,强调我的:

请注意,此方法返回一个

MotorCommandCursor
,它在第一次迭代时延迟运行聚合命令。为了使用 $out
$merge
 运行聚合,应用程序需要迭代光标

run_2_success

中,有一个初始的

await get_next(agg)
,执行聚合并等待光标返回。因此,可以在 TaskGroups
with
块中迭代光标。
但是在 

run_3_error

中,由于两个任务具有相同的

agg
聚合对象,因此它还没有准备好
next
- 而第一个任务仍在等待光标返回,而第二个任务已经在尝试迭代在光标上方(或者在任务执行时反之亦然)。
如果我在 

await asyncio.sleep(1)

中的两个任务之间添加

run_3_error
,那么它就可以工作。
此外,您不需要 

get_next(agg)

方法。

MotorCollection.aggregate
对象

返回一个

MotorCommandCursor

,可以像光标一样从 
find()
:
进行迭代

旁注:您不应该尝试并行迭代“一个游标”的结果。如果你想对结果进行异步操作,它应该是这样的:

async def do_something(res): print(f"doing something with: {res["item"]}") await asyncio.sleep(.1) print(f"done with: {res["item"]}") async def run_4_okay(): agg = create_aggregate() async with asyncio.TaskGroup() as tg: async for result in agg: tg.create_task(do_something(result)) (将它们附加到

tasks
 列表以便稍后跟踪结果)

输出,

使用此 
MongoDB 文档集合

doing something with: notebook doing something with: paper doing something with: postcard done with: paper done with: postcard done with: notebook

© www.soinside.com 2019 - 2024. All rights reserved.