我已将 SQLAlchemy 数据库代码切换为使用异步引擎,但在建立基本功能时遇到困难。
我有一个类可以像这样启动数据库:
class PostgresDb:
def __init__(self):
self._session = None
self._engine = None
def __getattr__(self, name):
return getattr(self._session, name)
def init(self):
self._engine = create_async_engine(
ENGINE,
echo=True,
future=True)
self._session = sessionmaker(
self._engine, expire_on_commit=False, class_=AsyncSession
)()
async def create_all(self):
async with self._engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
# Other methods...
如何调用 create_all 的示例:
async def init_db_tables():
self.init()
await self.create_all()
asyncio.run(init_db_tables())
当我想实现基本功能时,比如获取所有表格,我可以这样做:
def get_tables(self):
with create_engine(SYNCHRONOUS_ENGINE).connect() as conn:
meta = MetaData(conn, schema=SCHEMA)
meta.reflect(views=True)
table_list = meta.tables
return table_list
这并不理想,因为我需要实际传递同步引擎连接,而不是我在类中使用的实际异步引擎。它也非常冗长,不需要为每个查询都像这样启动。
我尝试执行类似的操作来从数据库中选择表“appuser”:
async def get_tables(self):
self.init()
async with self._session() as session:
q = select('appuser')
result = await session.execute(q)
curr = result.scalars()
for i in curr:
print(i)
我尝试过这样打电话
db = PostgresDb()
asyncio.run(db.get_tables())
asyncio.get_event_loop().run_until_complete(db.get_tables())
这两个都会给出错误:
async with self._session() as session:
TypeError: 'AsyncSession' object is not callable
调用它时出现
db.get_tables()
错误 RuntimeWarning: coroutine 'PostgresDb.get_tables' was never awaited db.get_tables() RuntimeWarning: Enable tracemalloc to get the object allocation traceback
尝试将
inspector
与 run_sync
一起使用,如下所示:
async def get_tables(self):
await self.init()
async with self._engine.begin() as conn:
inspector = conn.run_sync(inspect(conn))
table_names = await conn.run_sync(inspector.get_table_names())
print(table_names)
返回错误
sqlalchemy.exc.NoInspectionAvailable: Inspection on an AsyncConnection is currently not supported. Please use ``run_sync`` to pass a callable where it's possible to call ``inspect`` on the passed connection.
我已阅读https://docs.sqlalchemy.org/en/14/orm/extensions/asyncio.html#sqlalchemy.ext.asyncio.AsyncConnection.run_sync的文档,但我仍然不清楚如何干净地工作使用异步引擎。
感谢您就如何使用异步引擎在 SQLAlchmey 中执行简单查询
get all tables
提供的所有见解!
您可以通过运行反映连接的
run_sync
方法中的元数据来获取表:
import sqlalchemy as sa
...
async def get_tables(self):
self.init()
async with self._engine.connect() as conn:
# We could just use Base.metadata, but perhaps
# the database contains other tables as well as those
# defined in Base's metadata.
metadata = sa.MetaData(schema='public')
await conn.run_sync(metadata.reflect)
return metadata.tables
要查询单个表,仅给出表名称,我们可以将名称包装在表对象中,并将其和
'*'
传递给 select
(或传递 sa.text(select * from appusers)
)。
async def read_table(self):
self.init()
async with self._session() as session:
q = sa.select(sa.table('appusers'), sa.text('*'))
result = await session.execute(q)
rows = result.scalars()
for i in rows:
print(i)
错误
引发“AsyncSession”对象不可调用
是因为
orm.sessionmaker(...)()
已经返回了 Session
实例,因此无需调用它。根据您希望类在其整个生命周期中使用单个会话,还是每次调用方法时使用新会话,您可以在使用会话时不调用该会话:
async with self._session as session:
...
或者不立即调用调用
sessionmaker
的结果:
self._session = orm.sessionmaker(...)
async with self._session() as session:
...