SQLalchemy - 如何在运行测试用例时产生两个数据库连接

问题描述 投票:0回答:1

下面是database.py文件:

def get_dba_pgdsn() -> str:
    return f"mysql://{user}:{pass}@dba"


def get_dbb_pgdsn() -> str:
    return f"mysql://{user}:{pass}@dbb"


def engines() -> Engine:
    engines = {
        "dba": get_dba_pgdsn(),
        "dbb": get_dbb_pgdsn()
    }
    return engines


def get_engine() -> Engine:
    if ENGINE is None:
        raise Exception(
            "SQLAlchemy DB Engine has not been created yet. "
            "This is normally done in database.init()"
        )
    return ENGINE


Base = declarative_base(cls=SQLAlchemyBase)

engine_list = engines()

Session_a = scoped_session(sessionmaker(engine_list["dba"]))
Session_b = scoped_session(sessionmaker(engine_list["dbb"]))


def init(session_expire_on_commit=True):
    global ENGINE

    if not ENGINE:
        ENGINE = engines()
        Session_a.configure(bind=engine_list["dba"], expire_on_commit=session_expire_on_commit)
        Session_b.configure(bind=engine_list["dbb"], expire_on_commit=session_expire_on_commit)

这是我的conftest 文件:

from shared import database
from sqlalchemy import text
import pytest


@pytest.fixture()
def db_session_a():
    
    database.init()
    yield database.Session_a
    database.Session_a.rollback()
    for table in reversed(database.Base.metadata.sorted_tables):
        if table.name != "flyway_schema_history":
            sql_stmt = text(f"TRUNCATE TABLE {table.name} RESTART IDENTITY CASCADE;")
            database.Session_a.execute(sql_stmt)
    database.Session_a.commit()
    database.Session_a.close_all()


@pytest.fixture()
def db_session_b():
    
    database.init()
    yield database.Session_b
    database.Session_b.rollback()
    for table in reversed(database.Base.metadata.sorted_tables):
        if table.name != "flyway_schema_history":
            sql_stmt = text(f"TRUNCATE TABLE {table.name} RESTART IDENTITY CASCADE;")
            database.Session_b.execute(sql_stmt)
    database.Session_b.commit()
    database.Session_b.close_all()

最后是测试用例

def test_run(self, db_session_a, db_session_b):
    setup_sql_db()
    
    entry = factories.entry(id=1)
    
    print("entry:" + str(entry.id))

运行时出现此错误

self = <sqlalchemy.orm.session.SessionTransaction object at 0x147f27310>
bind = "mysql://{user}:{pass}@dba"
execution_options = None

def _connection_for_bind(self, bind, execution_options):
    self._assert_active()

    if bind in self._connections:
        if execution_options:
            util.warn(
                "Connection is already established for the "
                "given bind; execution_options ignored"
            )
        return self._connections[bind][0]

    local_connect = False
    should_commit = True

    if self._parent:
        conn = self._parent._connection_for_bind(bind, execution_options)
        if not self.nested:
            return conn
    else:
        if isinstance(bind, engine.Connection):
            conn = bind
            if conn.engine in self._connections:
                raise sa_exc.InvalidRequestError(
                    "Session already has a Connection associated for the "
                    "given Connection's Engine"
                )
        else:
            conn = bind.connect()
            AttributeError: 'str' object has no attribute 'connect'

所以我的问题是上面的代码,如何在测试用例执行期间正确绑定这两个数据库连接?

我希望能够同时建立两个数据库连接,因为我将有需要同时从两个数据库执行查询的测试用例。

python session sqlalchemy mysql-connector
1个回答
0
投票

您遇到的问题是您的

engines()
函数没有返回引擎的字典,而是返回连接字符串的字典:

def engines() -> Engine:
    engines = {
        "dba": get_dba_pgdsn(),
        "dbb": get_dbb_pgdsn()
    }
    return engines

Session.configure(bind=...)
采用引擎或连接,而不是连接字符串。

试试这个:

def engines() -> dict[str, Engine]:
    engines = {
        "dba": create_engine(get_dba_pgdsn()),
        "dbb": create_engine(get_dbb_pgdsn()),
    }
    return engines
© www.soinside.com 2019 - 2024. All rights reserved.