覆盖现有会话的绑定以使其加入外部事务?

问题描述 投票:0回答:1

以下开启外部交易示例:

import random
from asyncio import current_task
from functools import wraps

from fastapi import APIRouter
from pydantic import BaseModel
from sqlalchemy import MetaData
from sqlalchemy.ext.asyncio import async_scoped_session, async_sessionmaker, create_async_engine
from sqlalchemy.orm import declarative_base, Mapped, mapped_column
    
router = APIRouter()

engine = create_async_engine("url", echo=True)


scoped_session = async_scoped_session(
    async_sessionmaker(bind=engine, expire_on_commit=False, autocommit=False, autoflush=False),
    scopefunc=current_task
)

metadata = MetaData()
Base = declarative_base(metadata=metadata)


class Country(Base):
    id: Mapped[int] = mapped_column(autoincrement=True, primary_key=True, index=True)
    code: Mapped[str]
    name: Mapped[str]



class TestTransactionDataSchema(BaseModel):
    first_name: str
    last_name: str


@router.post("/test-external-transaction")
async def test_external_transaction(data: TestTransactionDataSchema): 
    async with engine.begin() as connection:
        session = scoped_session(bind=connection)
        country = Country(code=str(random.randint(1, 100_000)), name=str(random.randint(1, 100_000)))
        session.add(country)
        await session.commit()
        country = Country(code=str(random.randint(1, 100_000)), name=str(random.randint(1, 100_000)))
        session.add(country)
        await session.commit()
        await scoped_session.remove()
    return data

外部事务正在打开并且会话加入它:

2024-03-12 12:30:36,622 INFO sqlalchemy.engine.Engine BEGIN (implicit)
2024-03-12 12:30:36,691 INFO sqlalchemy.engine.Engine INSERT INTO countries (code, name, is_active) VALUES ($1::VARCHAR, $2::VARCHAR, $3::BOOLEAN) RETURNING countries.id
2024-03-12 12:30:36,691 INFO sqlalchemy.engine.Engine [generated in 0.00017s] ('54573', '98157', True)
2024-03-12 12:30:36,696 INFO sqlalchemy.engine.Engine INSERT INTO countries (code, name, is_active) VALUES ($1::VARCHAR, $2::VARCHAR, $3::BOOLEAN) RETURNING countries.id
2024-03-12 12:30:36,696 INFO sqlalchemy.engine.Engine [cached since 0.005118s ago] ('17932', '95985', True)
2024-03-12 12:30:36,698 INFO sqlalchemy.engine.Engine COMMIT

附注会话不会在每次提交时打开自己的事务。

现在我想将事务管理(出于某些原因)转移到装饰器中

atomic

import random
from asyncio import current_task
from functools import wraps

from fastapi import APIRouter, Depends
from pydantic import BaseModel
from sqlalchemy import MetaData
from sqlalchemy.ext.asyncio import async_scoped_session, async_sessionmaker, create_async_engine, AsyncSession
from sqlalchemy.orm import declarative_base, Mapped, mapped_column
    
router = APIRouter()

engine = create_async_engine("url", echo=True)


scoped_session = async_scoped_session(
    async_sessionmaker(bind=engine, expire_on_commit=False, autocommit=False, autoflush=False),
    scopefunc=current_task
)

metadata = MetaData()
Base = declarative_base(metadata=metadata)


class Country(Base):
    id: Mapped[int] = mapped_column(autoincrement=True, primary_key=True, index=True)
    code: Mapped[str]
    name: Mapped[str]


async def get_session():
    session = scoped_session()
    yield session
    await scoped_session.remove()


def atomic(func):

    @wraps(func)
    async def wrapper(*args, **kwargs):
        async with engine.begin() as connection:
            session = scoped_session()
            # session already in scoped scoped_session.registry.registry
            session.bind = connection
            result = await func(*args, **kwargs)

        return result

    return wrapper


class TestTransactionDataSchema(BaseModel):
    first_name: str
    last_name: str


@router.post("/test-external-transaction")
@atomic
async def test_external_transaction(data: TestTransactionDataSchema, session: AsyncSession = Depends(get_session)):
    country = Country(code=str(random.randint(1, 100_000)), name=str(random.randint(1, 100_000)))
    session.add(country)
    await session.commit()
    country = Country(code=str(random.randint(1, 100_000)), name=str(random.randint(1, 100_000)))
    session.add(country)
    await session.commit()
    return data

据我猜测,我需要覆盖作用域注册表中会话的绑定。但是

session.bind = connection
不起作用,因为在每个提交会话中都会打开新事务:

2024-03-12 12:41:17,265 INFO sqlalchemy.engine.Engine BEGIN (implicit)
2024-03-12 12:41:17,424 INFO sqlalchemy.engine.Engine BEGIN (implicit)
2024-03-12 12:41:17,426 INFO sqlalchemy.engine.Engine INSERT INTO countries (code, name, is_active) VALUES ($1::VARCHAR, $2::VARCHAR, $3::BOOLEAN) RETURNING countries.id
2024-03-12 12:41:17,426 INFO sqlalchemy.engine.Engine [generated in 0.00019s] ('20269', '40922', True)
2024-03-12 12:41:17,431 INFO sqlalchemy.engine.Engine COMMIT
2024-03-12 12:41:17,435 INFO sqlalchemy.engine.Engine BEGIN (implicit)
2024-03-12 12:41:17,436 INFO sqlalchemy.engine.Engine INSERT INTO countries (code, name, is_active) VALUES ($1::VARCHAR, $2::VARCHAR, $3::BOOLEAN) RETURNING countries.id
2024-03-12 12:41:17,436 INFO sqlalchemy.engine.Engine [cached since 0.01033s ago] ('67350', '57030', True)
2024-03-12 12:41:17,439 INFO sqlalchemy.engine.Engine COMMIT
2024-03-12 12:41:17,441 INFO sqlalchemy.engine.Engine COMMIT

这意味着会话不加入外部事务。

我的目标是在包装视图之外管理事务。

如何让会话加入外部事务?

sqlalchemy fastapi
1个回答
0
投票

为了确保会话加入外部事务并且不会在每次提交时打开新事务,需要确保会话正确绑定到外部连接。在您的

atomic
装饰器中,您当前正在装饰器内创建一个新会话,该会话不会绑定到外部连接。

您可以修改

atomic
装饰器以接受外部会话并将其绑定到作用域会话。

from functools import wraps

from fastapi import APIRouter, Depends
from pydantic import BaseModel
from sqlalchemy import MetaData
from sqlalchemy.ext.asyncio import async_scoped_session, async_sessionmaker, create_async_engine, AsyncSession
from sqlalchemy.orm import declarative_base, sessionmaker

router = APIRouter()

engine = create_async_engine("url", echo=True)

# Create an async session factory
SessionLocal = async_sessionmaker(bind=engine, expire_on_commit=False, autocommit=False, autoflush=False)

metadata = MetaData()
Base = declarative_base(metadata=metadata)


class Country(Base):
    id: int
    code: str
    name: str


class TestTransactionDataSchema(BaseModel):
    first_name: str
    last_name: str


async def get_session():
    async with SessionLocal() as session:
        yield session


def atomic(func):
    @wraps(func)
    async def wrapper(*args, **kwargs):
        async with engine.begin() as connection:
            async with SessionLocal() as session:
                session.bind = connection
                result = await func(*args, **kwargs)

        return result

    return wrapper


@router.post("/test-external-transaction")
@atomic
async def test_external_transaction(data: TestTransactionDataSchema, session: AsyncSession = Depends(get_session)):
    # Your transaction logic here
    country = Country(code=str(random.randint(1, 100_000)), name=str(random.randint(1, 100_000)))
    session.add(country)
    await session.commit()
    country = Country(code=str(random.randint(1, 100_000)), name=str(random.randint(1, 100_000)))
    session.add(country)
    await session.commit()
    return data
  • atomic
    装饰器现在接受使用
    SessionLocal
    创建的外部会话
    async_sessionmaker
  • 在装饰器内部,外部会话绑定到作用域会话,确保会话加入外部事务。
  • 视图函数
    test_external_transaction
    使用FastAPI提供的
    Depends
    机制接受会话作为依赖项。
© www.soinside.com 2019 - 2024. All rights reserved.