连接.py
from typing import AsyncIterable
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine
from sqlalchemy import NullPool
SessionFactoryType = async_sessionmaker[AsyncSession]
def create_session_factory(connection_url: str, **kwargs: dict[str, ...]) -> async_sessionmaker[AsyncSession]:
async_engine = create_async_engine(url=connection_url, **kwargs, poolclass=NullPool)
return async_sessionmaker(async_engine, autoflush=False, expire_on_commit=False)
async def create_async_session(session_factory: SessionFactoryType) -> AsyncIterable[AsyncSession]:
async with session_factory() as session:
yield session
工作单元.py
import contextlib
from sqlalchemy.exc import SQLAlchemyError
from sqlalchemy.ext.asyncio import AsyncSession, AsyncSessionTransaction
from src.common.interfaces.unit_of_work import AbstractUnitOfWork
class SQLAlchemyUnitOfWork(AbstractUnitOfWork[AsyncSession, AsyncSessionTransaction]):
async def commit(self) -> None:
with contextlib.suppress(SQLAlchemyError):
await self.session.commit()
async def rollback(self) -> None:
with contextlib.suppress(SQLAlchemyError):
await self.session.rollback()
async def create_transaction(self) -> None:
if not self.session.in_transaction() and self.session.is_active:
self._transaction = await self.session.begin()
async def close_transaction(self) -> None:
if self.session.is_active and not self.session.in_transaction():
await self.session.close()
def unit_of_work_factory(session: AsyncSession) -> SQLAlchemyUnitOfWork:
return SQLAlchemyUnitOfWork(session=session)
接口/unit_of_work.py
from __future__ import annotations
from abc import ABC, abstractmethod
from typing import Protocol
from typing_extensions import Self
class UnitOfWork(Protocol):
@abstractmethod
async def __aenter__(self: Self) -> Self:
raise NotImplementedError
@abstractmethod
async def __aexit__(self, exc_type, exc_val, exc_tb) -> None:
raise NotImplementedError
@abstractmethod
async def commit(self) -> None:
raise NotImplementedError
@abstractmethod
async def rollback(self) -> None:
raise NotImplementedError
@abstractmethod
async def create_transaction(self) -> None:
raise NotImplementedError
@abstractmethod
async def close_transaction(self) -> None:
raise NotImplementedError
class AbstractUnitOfWork[SessionType, TransactionType](ABC, UnitOfWork):
__slots__ = ("session", "_transaction")
def __init__(self, session: SessionType) -> None:
self.session: SessionType = session
self._transaction: TransactionType | None = None
async def __aenter__(self) -> AbstractUnitOfWork[SessionType, TransactionType]:
await self.create_transaction()
return self
async def __aexit__(self, exc_type, exc_val, exc_tb) -> None:
if self._transaction:
if exc_type:
await self.rollback()
else:
await self.commit()
await self.close_transaction()
交互器.py
from typing import Type
from sqlalchemy.ext.asyncio import AsyncSession
from src.common.interfaces.repository import Repository
from src.databases.orm.repositories.crud import SQLAlchemyCRUDRepository
class BaseRepository[ModelType](Repository):
__slots__ = ("model", "_session", "crud")
model: Type[ModelType]
def __init__(self, session: AsyncSession) -> None:
self._session = session
self.crud = SQLAlchemyCRUDRepository(session=session, model=self.model)
class BaseInteractor[ModelType]:
__slots__ = ("repository",)
def __init__(self, repository: BaseRepository[ModelType]) -> None:
self.repository = repository
crud.py
from typing import Any, Mapping, Optional, Sequence, TypeVar
from iotsota_models.orm import BaseSQLAlchemyModel
from sqlalchemy import ColumnExpressionArgument, insert, select, update
from sqlalchemy.ext.asyncio import AsyncSession
from src.common.interfaces.crud import AbstractCRUDRepository
ModelType = TypeVar("ModelType", bound=BaseSQLAlchemyModel)
class SQLAlchemyCRUDRepository(AbstractCRUDRepository[ModelType, ColumnExpressionArgument]):
__slots__ = ("_session",)
def __init__(self, session: AsyncSession, model: type[ModelType]) -> None:
super().__init__(model)
self._session = session
async def select(self, *clauses: ColumnExpressionArgument) -> ModelType | None:
stmt = select(self.model).where(*clauses)
return (await self._session.execute(stmt)).scalars().first()
async def select_many(
self, *clauses: ColumnExpressionArgument, offset: Optional[int] = None, limit: Optional[int] = None
) -> Sequence[ModelType]:
stmt = select(self.model).where(*clauses).offset(offset).limit(limit)
return (await self._session.execute(stmt)).scalars().all()
async def create(self, **values: Mapping[str, Any]) -> ModelType | None:
stmt = insert(self.model).values(**values).returning(self.model)
return (await self._session.execute(stmt)).scalars().first()
async def update(self, *clauses: ColumnExpressionArgument, **values: Mapping[str, Any]) -> list[ModelType] | None:
stmt = update(self.model).where(*clauses).values(**values).returning(self.model)
return (await self._session.execute(stmt)).unique().scalars().all()
回溯
File "/Users/oleksiiyudin/Documents/WORK/authentication-service/src/databases/orm/repositories/user/reader.py", line 13, in select_user
return await self.repository.crud.select(clauses)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/oleksiiyudin/Documents/WORK/authentication-service/src/databases/orm/repositories/crud.py", line 21, in select
return (await self._session.execute(stmt)).scalars().first()
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/oleksiiyudin/Library/Caches/pypoetry/virtualenvs/auth_service-v_N9SFpi-py3.12/lib/python3.12/site-packages/sqlalchemy/ext/asyncio/session.py", line 461, in execute
result = await greenlet_spawn(
^^^^^^^^^^^^^^^^^^^^^
File "/Users/oleksiiyudin/Library/Caches/pypoetry/virtualenvs/auth_service-v_N9SFpi-py3.12/lib/python3.12/site-packages/sqlalchemy/util/_concurrency_py3k.py", line 190, in greenlet_spawn
result = context.switch(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/oleksiiyudin/Library/Caches/pypoetry/virtualenvs/auth_service-v_N9SFpi-py3.12/lib/python3.12/site-packages/sqlalchemy/orm/session.py", line 2306, in execute
return self._execute_internal(
^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/oleksiiyudin/Library/Caches/pypoetry/virtualenvs/auth_service-v_N9SFpi-py3.12/lib/python3.12/site-packages/sqlalchemy/orm/session.py", line 2181, in _execute_internal
conn = self._connection_for_bind(bind)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/oleksiiyudin/Library/Caches/pypoetry/virtualenvs/auth_service-v_N9SFpi-py3.12/lib/python3.12/site-packages/sqlalchemy/orm/session.py", line 2050, in _connection_for_bind
return trans._connection_for_bind(engine, execution_options)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "<string>", line 2, in _connection_for_bind
File "/Users/oleksiiyudin/Library/Caches/pypoetry/virtualenvs/auth_service-v_N9SFpi-py3.12/lib/python3.12/site-packages/sqlalchemy/orm/state_changes.py", line 103, in _go
self._raise_for_prerequisite_state(fn.__name__, current_state)
File "/Users/oleksiiyudin/Library/Caches/pypoetry/virtualenvs/auth_service-v_N9SFpi-py3.12/lib/python3.12/site-packages/sqlalchemy/orm/session.py", line 945, in _raise_for_prerequisite_state
raise sa_exc.InvalidRequestError(
sqlalchemy.exc.InvalidRequestError: This session is provisioning a new connection; concurrent operations are not permitted (Background on this error at: https://sqlalche.me/e/20/isce)
版本:
Python“^3.12” sqlalchemy“^2.0.29” asyncpg“^0.29.0” 格林莱特“^3.0.3”
尝试将 poolclass=NullPool 设置为创建连接不起作用, IllegalStateChangeError 和并发异常尝试阅读有关此错误的文档,但没有帮助
您在 FastAPI 和 SQLAlchemy 中面临异步数据库操作的常见问题。当您尝试使用同一数据库连接同时执行多个查询时,通常会弹出此错误。确保每个异步操作都等待轮到它,并考虑使用作用域会话或连接池来更好地管理并发数据库操作。这可能有助于避免此类错误并使您的应用程序更加健壮。