如何使用 SQLAlchemy 制作异步生成器?

问题描述 投票:0回答:1

我正在尝试实现一个名为 get_all_by_chunk() 的异步生成器,以使用 SQLAlchemy 和 AsyncSession 从我的数据库中获取数据块。然而,当前的实施并没有像预期的那样工作。

class BaseDAO(Generic[Model]):
    def __init__(self, model: Type[Model], session: AsyncSession):
        self.model = model
        self.session = session

    ...

    async def get_all_by_chunk(self, chunk_size=10_000):
        result = await self.session.execute(
            select(self.model).yield_per(chunk_size)
        )
        async for row in result.scalars():
            yield row

结果:TypeError: object async_generator can't be used in 'await' expression

如何使用 SQLAlchemy 和 AsyncSession 将 get_all_by_chunk 方法正确地实现为异步生成器以从表中获取数据块?

python 3.11/sqlalchemy 2.0.13

python asynchronous sqlalchemy generator
1个回答
0
投票

我无法实现流式传输,但我设法通过以下方式实现了块中的获取:

async def get_records(self, *whereclauses, limit: int = None, offset: int = None):
   stmt = select(self.model)

    if whereclauses:
        stmt = stmt.where(*whereclauses)
    if limit:
        stmt = stmt.limit(limit)
    if offset:
        stmt = stmt.offset(offset)
    result = await self.session.execute(stmt)
    return result.scalars().all()

async def get_chunk_iterator(self, *whereclauses, chunk_size: int):
    
    offset = 0  # Start from the beginning

    while True:
        # Get the next batch of records
        records = await self.get_records(*whereclauses, limit=chunk_size, offset=offset)

        # If no more records, stop
        if not records:
            break

        # Yield the records
        yield records

        # Update the offset for the next batch
        offset += chunk_size
© www.soinside.com 2019 - 2024. All rights reserved.