异步 SQLAlchemy 引擎 Alembic 迁移

问题描述 投票:0回答:1

问题陈述

我正在将我的

SQLAlchemy
PostgreSQL
驱动程序转换为异步驱动程序,并且我需要使用异步引擎执行
Alembic
迁移。失败了。

到目前为止我所做的事情

  • 查看 Alembic 文档
  • 寻找其他类似的SO线程

错误

2023-12-24 14:36:22 Starting entrypoint.sh
2023-12-24 14:36:29 Database is up and running
2023-12-24 14:36:29 Generating migrations
2023-12-24 14:36:38 Generating /app/alembic/versions/9a4735888d4b_initial_migration.py ...  done
2023-12-24 14:36:41 Running migrations
2023-12-24 14:36:45 Migration completed successfully.
2023-12-24 14:36:45 Seeding with test user
2023-12-24 14:36:49 An error occurred while seeding the expressions: AsyncConnection context has not been started and object has not been awaited.
2023-12-24 14:36:50 Inside start_server function
2023-12-24 14:36:50 Starting ngrok
Authtoken saved to configuration file: /root/.config/ngrok/ngrok.yml                                
2023-12-24 14:36:22 wait-for-it.sh: waiting 60 seconds for db:5432
2023-12-24 14:36:29 wait-for-it.sh: db:5432 is available after 7 seconds
2023-12-24 14:36:38 INFO  [alembic.runtime.migration] Context impl PostgresqlImpl.
2023-12-24 14:36:38 INFO  [alembic.runtime.migration] Will assume transactional DDL.
2023-12-24 14:36:45 INFO  [alembic.runtime.migration] Context impl PostgresqlImpl.
2023-12-24 14:36:45 INFO  [alembic.runtime.migration] Will assume transactional DDL.
2023-12-24 14:36:45 INFO  [alembic.runtime.migration] Running upgrade  -> 9a4735888d4b, Initial migration
2023-12-24 14:36:49 /usr/local/lib/python3.11/site-packages/sqlalchemy/orm/session.py:775: RuntimeWarning: coroutine 'AsyncConnection.close' was never awaited
2023-12-24 14:36:49   conn.close()

alembic/env.py

from logging.config import fileConfig
import asyncio
from sqlalchemy.ext.asyncio import create_async_engine
from sqlalchemy.pool import NullPool
from alembic import context
from database.database_config import Base, db_url
from services.utils import logger
import traceback

config = context.config
fileConfig(config.config_file_name)
target_metadata = Base.metadata

if db_url:
    config.set_main_option("sqlalchemy.url", db_url)

def do_run_migrations(connection):
    try:
        context.configure(
            connection=connection,
            target_metadata=target_metadata
        )

        with context.begin_transaction():
            context.run_migrations()
    except Exception as e:
        logger.error(traceback.format_exc())
        raise

async def run_async_migrations():
    connectable = create_async_engine(db_url, poolclass=NullPool)

    async with connectable.connect() as connection:
        await connection.run_sync(do_run_migrations)

    await connectable.dispose()

def run_migrations_online():
    asyncio.run(run_async_migrations())

run_migrations_online()

下面的代码用于设置异步引擎

database_config.py

from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession

from services.utils import logger
import traceback
from config.env_var import *

DB_USER = os.getenv('DB_USER')
DB_PASSWORD = os.getenv('DB_PASSWORD')
DB_HOST = os.getenv('DB_HOST')
DB_NAME = os.getenv('DB_NAME')

Base = declarative_base()


db_url = f'postgresql+asyncpg://{DB_USER}:{DB_PASSWORD}@{DB_HOST}:5432/{DB_NAME}'

try:
    engine = create_async_engine(db_url, echo=True)
except Exception as e:
    logger.info(f"Error creating database engine: {e}")
    logger.info(traceback.format_exc())
    raise

AsyncSessionLocal = sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)

async def get_db():
    db = AsyncSessionLocal()
    try:
        yield db
    except Exception as e:
        logger.info(f"Failed with db_url: {db_url}")
        logger.info(f"Database session error: {e}")
        logger.info(traceback.format_exc())
        raise
    finally:
        await db.close()

最后是我的 init_db.py,我在本地开发模式下运行了一次:

import asyncio
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import sessionmaker

from database.models import *
from database.enums import *
from database.database_config import Base, engine, db_url

async def create_tables():
    # Use the async engine from your database configuration
    async_engine = create_async_engine(db_url)

    # Asynchronous table creation
    async with async_engine.begin() as conn:
        await conn.run_sync(Base.metadata.create_all)

if __name__ == "__main__":
    asyncio.run(create_tables())

如何正确创建异步迁移?

postgresql sqlalchemy python-asyncio fastapi alembic
1个回答
0
投票

这个问题中存在一些误解。

首先,

init_db.py
是多余的,因为 Alembic 负责初始迁移。所以把它从我的流程中删除了。

然后,我在我的database_config.py中导入了所有的SQL Alchemy模型和枚举类型,重新运行脚本并且迁移成功。

© www.soinside.com 2019 - 2024. All rights reserved.