以下开启外部交易示例:
import random
from asyncio import current_task
from functools import wraps
from fastapi import APIRouter
from pydantic import BaseModel
from sqlalchemy import MetaData
from sqlalchemy.ext.asyncio import async_scoped_session, async_sessionmaker, create_async_engine
from sqlalchemy.orm import declarative_base, Mapped, mapped_column
router = APIRouter()
engine = create_async_engine("url", echo=True)
scoped_session = async_scoped_session(
async_sessionmaker(bind=engine, expire_on_commit=False, autocommit=False, autoflush=False),
scopefunc=current_task
)
metadata = MetaData()
Base = declarative_base(metadata=metadata)
class Country(Base):
id: Mapped[int] = mapped_column(autoincrement=True, primary_key=True, index=True)
code: Mapped[str]
name: Mapped[str]
class TestTransactionDataSchema(BaseModel):
first_name: str
last_name: str
@router.post("/test-external-transaction")
async def test_external_transaction(data: TestTransactionDataSchema):
async with engine.begin() as connection:
session = scoped_session(bind=connection)
country = Country(code=str(random.randint(1, 100_000)), name=str(random.randint(1, 100_000)))
session.add(country)
await session.commit()
country = Country(code=str(random.randint(1, 100_000)), name=str(random.randint(1, 100_000)))
session.add(country)
await session.commit()
await scoped_session.remove()
return data
外部事务正在打开并且会话加入它:
2024-03-12 12:30:36,622 INFO sqlalchemy.engine.Engine BEGIN (implicit)
2024-03-12 12:30:36,691 INFO sqlalchemy.engine.Engine INSERT INTO countries (code, name, is_active) VALUES ($1::VARCHAR, $2::VARCHAR, $3::BOOLEAN) RETURNING countries.id
2024-03-12 12:30:36,691 INFO sqlalchemy.engine.Engine [generated in 0.00017s] ('54573', '98157', True)
2024-03-12 12:30:36,696 INFO sqlalchemy.engine.Engine INSERT INTO countries (code, name, is_active) VALUES ($1::VARCHAR, $2::VARCHAR, $3::BOOLEAN) RETURNING countries.id
2024-03-12 12:30:36,696 INFO sqlalchemy.engine.Engine [cached since 0.005118s ago] ('17932', '95985', True)
2024-03-12 12:30:36,698 INFO sqlalchemy.engine.Engine COMMIT
附注会话不会在每次提交时打开自己的事务。
现在我想将事务管理(出于某些原因)转移到装饰器中
atomic
:
import random
from asyncio import current_task
from functools import wraps
from fastapi import APIRouter, Depends
from pydantic import BaseModel
from sqlalchemy import MetaData
from sqlalchemy.ext.asyncio import async_scoped_session, async_sessionmaker, create_async_engine, AsyncSession
from sqlalchemy.orm import declarative_base, Mapped, mapped_column
router = APIRouter()
engine = create_async_engine("url", echo=True)
scoped_session = async_scoped_session(
async_sessionmaker(bind=engine, expire_on_commit=False, autocommit=False, autoflush=False),
scopefunc=current_task
)
metadata = MetaData()
Base = declarative_base(metadata=metadata)
class Country(Base):
id: Mapped[int] = mapped_column(autoincrement=True, primary_key=True, index=True)
code: Mapped[str]
name: Mapped[str]
async def get_session():
session = scoped_session()
yield session
await scoped_session.remove()
def atomic(func):
@wraps(func)
async def wrapper(*args, **kwargs):
async with engine.begin() as connection:
session = scoped_session()
# session already in scoped scoped_session.registry.registry
session.bind = connection
result = await func(*args, **kwargs)
return result
return wrapper
class TestTransactionDataSchema(BaseModel):
first_name: str
last_name: str
@router.post("/test-external-transaction")
@atomic
async def test_external_transaction(data: TestTransactionDataSchema, session: AsyncSession = Depends(get_session)):
country = Country(code=str(random.randint(1, 100_000)), name=str(random.randint(1, 100_000)))
session.add(country)
await session.commit()
country = Country(code=str(random.randint(1, 100_000)), name=str(random.randint(1, 100_000)))
session.add(country)
await session.commit()
return data
据我猜测,我需要覆盖作用域注册表中会话的绑定。但是
session.bind = connection
不起作用,因为在每个提交会话中都会打开新事务:
2024-03-12 12:41:17,265 INFO sqlalchemy.engine.Engine BEGIN (implicit)
2024-03-12 12:41:17,424 INFO sqlalchemy.engine.Engine BEGIN (implicit)
2024-03-12 12:41:17,426 INFO sqlalchemy.engine.Engine INSERT INTO countries (code, name, is_active) VALUES ($1::VARCHAR, $2::VARCHAR, $3::BOOLEAN) RETURNING countries.id
2024-03-12 12:41:17,426 INFO sqlalchemy.engine.Engine [generated in 0.00019s] ('20269', '40922', True)
2024-03-12 12:41:17,431 INFO sqlalchemy.engine.Engine COMMIT
2024-03-12 12:41:17,435 INFO sqlalchemy.engine.Engine BEGIN (implicit)
2024-03-12 12:41:17,436 INFO sqlalchemy.engine.Engine INSERT INTO countries (code, name, is_active) VALUES ($1::VARCHAR, $2::VARCHAR, $3::BOOLEAN) RETURNING countries.id
2024-03-12 12:41:17,436 INFO sqlalchemy.engine.Engine [cached since 0.01033s ago] ('67350', '57030', True)
2024-03-12 12:41:17,439 INFO sqlalchemy.engine.Engine COMMIT
2024-03-12 12:41:17,441 INFO sqlalchemy.engine.Engine COMMIT
这意味着会话不加入外部事务。
我的目标是在包装视图之外管理事务。
如何让会话加入外部事务?
为了确保会话加入外部事务并且不会在每次提交时打开新事务,需要确保会话正确绑定到外部连接。在您的
atomic
装饰器中,您当前正在装饰器内创建一个新会话,该会话不会绑定到外部连接。
您可以修改
atomic
装饰器以接受外部会话并将其绑定到作用域会话。
from functools import wraps
from fastapi import APIRouter, Depends
from pydantic import BaseModel
from sqlalchemy import MetaData
from sqlalchemy.ext.asyncio import async_scoped_session, async_sessionmaker, create_async_engine, AsyncSession
from sqlalchemy.orm import declarative_base, sessionmaker
router = APIRouter()
engine = create_async_engine("url", echo=True)
# Create an async session factory
SessionLocal = async_sessionmaker(bind=engine, expire_on_commit=False, autocommit=False, autoflush=False)
metadata = MetaData()
Base = declarative_base(metadata=metadata)
class Country(Base):
id: int
code: str
name: str
class TestTransactionDataSchema(BaseModel):
first_name: str
last_name: str
async def get_session():
async with SessionLocal() as session:
yield session
def atomic(func):
@wraps(func)
async def wrapper(*args, **kwargs):
async with engine.begin() as connection:
async with SessionLocal() as session:
session.bind = connection
result = await func(*args, **kwargs)
return result
return wrapper
@router.post("/test-external-transaction")
@atomic
async def test_external_transaction(data: TestTransactionDataSchema, session: AsyncSession = Depends(get_session)):
# Your transaction logic here
country = Country(code=str(random.randint(1, 100_000)), name=str(random.randint(1, 100_000)))
session.add(country)
await session.commit()
country = Country(code=str(random.randint(1, 100_000)), name=str(random.randint(1, 100_000)))
session.add(country)
await session.commit()
return data
atomic
装饰器现在接受使用SessionLocal
创建的外部会话async_sessionmaker
。test_external_transaction
使用FastAPI提供的Depends
机制接受会话作为依赖项。