我正在尝试实现一个名为 get_all_by_chunk() 的异步生成器,以使用 SQLAlchemy 和 AsyncSession 从我的数据库中获取数据块。然而,当前的实施并没有像预期的那样工作。
class BaseDAO(Generic[Model]):
def __init__(self, model: Type[Model], session: AsyncSession):
self.model = model
self.session = session
...
async def get_all_by_chunk(self, chunk_size=10_000):
result = await self.session.execute(
select(self.model).yield_per(chunk_size)
)
async for row in result.scalars():
yield row
结果:TypeError: object async_generator can't be used in 'await' expression
如何使用 SQLAlchemy 和 AsyncSession 将 get_all_by_chunk 方法正确地实现为异步生成器以从表中获取数据块?
python 3.11/sqlalchemy 2.0.13
我无法实现流式传输,但我设法通过以下方式实现了块中的获取:
async def get_records(self, *whereclauses, limit: int = None, offset: int = None):
stmt = select(self.model)
if whereclauses:
stmt = stmt.where(*whereclauses)
if limit:
stmt = stmt.limit(limit)
if offset:
stmt = stmt.offset(offset)
result = await self.session.execute(stmt)
return result.scalars().all()
async def get_chunk_iterator(self, *whereclauses, chunk_size: int):
offset = 0 # Start from the beginning
while True:
# Get the next batch of records
records = await self.get_records(*whereclauses, limit=chunk_size, offset=offset)
# If no more records, stop
if not records:
break
# Yield the records
yield records
# Update the offset for the next batch
offset += chunk_size