FastAPI、SQLAlchemy、asyncio,此会话正在配置新连接;不允许并发操作

问题描述 投票:0回答:1

连接.py

from typing import AsyncIterable
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine
from sqlalchemy import NullPool

SessionFactoryType = async_sessionmaker[AsyncSession]

def create_session_factory(connection_url: str, **kwargs: dict[str, ...]) -> async_sessionmaker[AsyncSession]:
    async_engine = create_async_engine(url=connection_url, **kwargs, poolclass=NullPool)
    return async_sessionmaker(async_engine, autoflush=False, expire_on_commit=False)

async def create_async_session(session_factory: SessionFactoryType) -> AsyncIterable[AsyncSession]:
    async with session_factory() as session:
        yield session

工作单元.py

import contextlib
from sqlalchemy.exc import SQLAlchemyError
from sqlalchemy.ext.asyncio import AsyncSession, AsyncSessionTransaction
from src.common.interfaces.unit_of_work import AbstractUnitOfWork

class SQLAlchemyUnitOfWork(AbstractUnitOfWork[AsyncSession, AsyncSessionTransaction]):
    async def commit(self) -> None:
        with contextlib.suppress(SQLAlchemyError):
            await self.session.commit()

    async def rollback(self) -> None:
        with contextlib.suppress(SQLAlchemyError):
            await self.session.rollback()

    async def create_transaction(self) -> None:
        if not self.session.in_transaction() and self.session.is_active:
            self._transaction = await self.session.begin()

    async def close_transaction(self) -> None:
        if self.session.is_active and not self.session.in_transaction():
            await self.session.close()

def unit_of_work_factory(session: AsyncSession) -> SQLAlchemyUnitOfWork:
    return SQLAlchemyUnitOfWork(session=session)

接口/unit_of_work.py

from __future__ import annotations
from abc import ABC, abstractmethod
from typing import Protocol
from typing_extensions import Self

class UnitOfWork(Protocol):
    @abstractmethod
    async def __aenter__(self: Self) -> Self:
        raise NotImplementedError

    @abstractmethod
    async def __aexit__(self, exc_type, exc_val, exc_tb) -> None:
        raise NotImplementedError

    @abstractmethod
    async def commit(self) -> None:
        raise NotImplementedError

    @abstractmethod
    async def rollback(self) -> None:
        raise NotImplementedError

    @abstractmethod
    async def create_transaction(self) -> None:
        raise NotImplementedError

    @abstractmethod
    async def close_transaction(self) -> None:
        raise NotImplementedError

class AbstractUnitOfWork[SessionType, TransactionType](ABC, UnitOfWork):
    __slots__ = ("session", "_transaction")

    def __init__(self, session: SessionType) -> None:
        self.session: SessionType = session
        self._transaction: TransactionType | None = None

    async def __aenter__(self) -> AbstractUnitOfWork[SessionType, TransactionType]:
        await self.create_transaction()
        return self

    async def __aexit__(self, exc_type, exc_val, exc_tb) -> None:
        if self._transaction:
            if exc_type:
                await self.rollback()
            else:
                await self.commit()

        await self.close_transaction()

交互器.py

from typing import Type
from sqlalchemy.ext.asyncio import AsyncSession
from src.common.interfaces.repository import Repository
from src.databases.orm.repositories.crud import SQLAlchemyCRUDRepository

class BaseRepository[ModelType](Repository):
    __slots__ = ("model", "_session", "crud")

    model: Type[ModelType]

    def __init__(self, session: AsyncSession) -> None:
        self._session = session
        self.crud = SQLAlchemyCRUDRepository(session=session, model=self.model)

class BaseInteractor[ModelType]:
    __slots__ = ("repository",)

    def __init__(self, repository: BaseRepository[ModelType]) -> None:
        self.repository = repository

crud.py

from typing import Any, Mapping, Optional, Sequence, TypeVar
from iotsota_models.orm import BaseSQLAlchemyModel
from sqlalchemy import ColumnExpressionArgument, insert, select, update
from sqlalchemy.ext.asyncio import AsyncSession
from src.common.interfaces.crud import AbstractCRUDRepository

ModelType = TypeVar("ModelType", bound=BaseSQLAlchemyModel)

class SQLAlchemyCRUDRepository(AbstractCRUDRepository[ModelType, ColumnExpressionArgument]):
    __slots__ = ("_session",)

    def __init__(self, session: AsyncSession, model: type[ModelType]) -> None:
        super().__init__(model)
        self._session = session

    async def select(self, *clauses: ColumnExpressionArgument) -> ModelType | None:
        stmt = select(self.model).where(*clauses)
        return (await self._session.execute(stmt)).scalars().first()

    async def select_many(
        self, *clauses: ColumnExpressionArgument, offset: Optional[int] = None, limit: Optional[int] = None
    ) -> Sequence[ModelType]:
        stmt = select(self.model).where(*clauses).offset(offset).limit(limit)
        return (await self._session.execute(stmt)).scalars().all()

    async def create(self, **values: Mapping[str, Any]) -> ModelType | None:
        stmt = insert(self.model).values(**values).returning(self.model)
        return (await self._session.execute(stmt)).scalars().first()

    async def update(self, *clauses: ColumnExpressionArgument, **values: Mapping[str, Any]) -> list[ModelType] | None:
        stmt = update(self.model).where(*clauses).values(**values).returning(self.model)
        return (await self._session.execute(stmt)).unique().scalars().all()

回溯

  File "/Users/oleksiiyudin/Documents/WORK/authentication-service/src/databases/orm/repositories/user/reader.py", line 13, in select_user
    return await self.repository.crud.select(clauses)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/oleksiiyudin/Documents/WORK/authentication-service/src/databases/orm/repositories/crud.py", line 21, in select
    return (await self._session.execute(stmt)).scalars().first()
            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/oleksiiyudin/Library/Caches/pypoetry/virtualenvs/auth_service-v_N9SFpi-py3.12/lib/python3.12/site-packages/sqlalchemy/ext/asyncio/session.py", line 461, in execute
    result = await greenlet_spawn(
             ^^^^^^^^^^^^^^^^^^^^^
  File "/Users/oleksiiyudin/Library/Caches/pypoetry/virtualenvs/auth_service-v_N9SFpi-py3.12/lib/python3.12/site-packages/sqlalchemy/util/_concurrency_py3k.py", line 190, in greenlet_spawn
    result = context.switch(*args, **kwargs)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/oleksiiyudin/Library/Caches/pypoetry/virtualenvs/auth_service-v_N9SFpi-py3.12/lib/python3.12/site-packages/sqlalchemy/orm/session.py", line 2306, in execute
    return self._execute_internal(
           ^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/oleksiiyudin/Library/Caches/pypoetry/virtualenvs/auth_service-v_N9SFpi-py3.12/lib/python3.12/site-packages/sqlalchemy/orm/session.py", line 2181, in _execute_internal
    conn = self._connection_for_bind(bind)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/oleksiiyudin/Library/Caches/pypoetry/virtualenvs/auth_service-v_N9SFpi-py3.12/lib/python3.12/site-packages/sqlalchemy/orm/session.py", line 2050, in _connection_for_bind
    return trans._connection_for_bind(engine, execution_options)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "<string>", line 2, in _connection_for_bind
  File "/Users/oleksiiyudin/Library/Caches/pypoetry/virtualenvs/auth_service-v_N9SFpi-py3.12/lib/python3.12/site-packages/sqlalchemy/orm/state_changes.py", line 103, in _go
    self._raise_for_prerequisite_state(fn.__name__, current_state)
  File "/Users/oleksiiyudin/Library/Caches/pypoetry/virtualenvs/auth_service-v_N9SFpi-py3.12/lib/python3.12/site-packages/sqlalchemy/orm/session.py", line 945, in _raise_for_prerequisite_state
    raise sa_exc.InvalidRequestError(
sqlalchemy.exc.InvalidRequestError: This session is provisioning a new connection; concurrent operations are not permitted (Background on this error at: https://sqlalche.me/e/20/isce)

版本:

Python“^3.12” sqlalchemy“^2.0.29” asyncpg“^0.29.0” 格林莱特“^3.0.3”

尝试将 poolclass=NullPool 设置为创建连接不起作用, IllegalStateChangeError 和并发异常尝试阅读有关此错误的文档,但没有帮助

python sqlalchemy python-asyncio fastapi greenlets
1个回答
0
投票

您在 FastAPI 和 SQLAlchemy 中面临异步数据库操作的常见问题。当您尝试使用同一数据库连接同时执行多个查询时,通常会弹出此错误。确保每个异步操作都等待轮到它,并考虑使用作用域会话或连接池来更好地管理并发数据库操作。这可能有助于避免此类错误并使您的应用程序更加健壮。

© www.soinside.com 2019 - 2024. All rights reserved.