带有 ORM 的 SQLAlchemy 异步引擎无法获取数据库表

问题描述 投票:0回答:1

我已将 SQLAlchemy 数据库代码切换为使用异步引擎,但在建立基本功能时遇到困难。

我有一个类可以像这样启动数据库:

class PostgresDb:

    def __init__(self):
        self._session = None
        self._engine = None

    def __getattr__(self, name):
        return getattr(self._session, name)

    def init(self):
        self._engine = create_async_engine(
            ENGINE,
            echo=True,
            future=True)

        self._session = sessionmaker(
            self._engine, expire_on_commit=False, class_=AsyncSession
        )()

   async def create_all(self):
        async with self._engine.begin() as conn:
            await conn.run_sync(Base.metadata.create_all)

   # Other methods...

如何调用 create_all 的示例:

async def init_db_tables():
    self.init()
    await self.create_all()
asyncio.run(init_db_tables())

当我想实现基本功能时,比如获取所有表格,我可以这样做:

def get_tables(self):
    with create_engine(SYNCHRONOUS_ENGINE).connect() as conn:
        meta = MetaData(conn, schema=SCHEMA)
        meta.reflect(views=True)
        table_list = meta.tables
        return table_list

这并不理想,因为我需要实际传递同步引擎连接,而不是我在类中使用的实际异步引擎。它也非常冗长,不需要为每个查询都像这样启动。

我尝试执行类似的操作来从数据库中选择表“appuser”:

async def get_tables(self):
    self.init()
    async with self._session() as session:
        q = select('appuser')
        result = await session.execute(q)
        curr = result.scalars()
        for i in curr:
            print(i)

我尝试过这样打电话

db = PostgresDb()
asyncio.run(db.get_tables())
asyncio.get_event_loop().run_until_complete(db.get_tables())

这两个都会给出错误:

async with self._session() as session:
TypeError: 'AsyncSession' object is not callable

调用它时出现

db.get_tables()
错误
RuntimeWarning: coroutine 'PostgresDb.get_tables' was never awaited db.get_tables() RuntimeWarning: Enable tracemalloc to get the object allocation traceback

尝试将

inspector
run_sync
一起使用,如下所示:

    async def get_tables(self):
        await self.init()
        async with self._engine.begin() as conn:
            inspector = conn.run_sync(inspect(conn))
            table_names = await conn.run_sync(inspector.get_table_names())
            print(table_names)

返回错误

sqlalchemy.exc.NoInspectionAvailable: Inspection on an AsyncConnection is currently not supported. Please use ``run_sync`` to pass a callable where it's possible to call ``inspect`` on the passed connection. 

我已阅读https://docs.sqlalchemy.org/en/14/orm/extensions/asyncio.html#sqlalchemy.ext.asyncio.AsyncConnection.run_sync的文档,但我仍然不清楚如何干净地工作使用异步引擎。

如何使用异步引擎在 SQLAlchemy 中执行简单查询

get all tables

python-3.x asynchronous sqlalchemy orm
1个回答
0
投票

您可以通过运行反映连接的

run_sync
方法中的元数据来获取表:

import sqlalchemy as sa

...

    async def get_tables(self):
        self.init()
        async with self._engine.connect() as conn:
            # We could just use Base.metadata, but perhaps
            # the database contains other tables as well as those
            # defined in Base's metadata.
            metadata = sa.MetaData(schema='public')
            await conn.run_sync(metadata.reflect)
        return metadata.tables

要查询单个表,仅给出表名称,我们可以将名称包装在表对象中,并将其和

'*'
传递给
select
(或传递
sa.text(select * from appusers)
)。

    async def read_table(self):
        self.init()
        async with self._session() as session:
            q = sa.select(sa.table('appusers'), sa.text('*'))
            result = await session.execute(q)
            rows = result.scalars()
            for i in rows:
                print(i)

错误

“AsyncSession”对象不可调用

引发

是因为

orm.sessionmaker(...)()
已经返回了
Session
实例,因此无需调用它。根据您希望类在其整个生命周期中使用单个会话,还是每次调用方法时使用新会话,您可以在使用会话时不调用该会话:

async with self._session as session:
    ...

或者不立即调用调用

sessionmaker
的结果:

self._session = orm.sessionmaker(...)

async with self._session() as session:
    ...
© www.soinside.com 2019 - 2024. All rights reserved.