使用 SQLAlchemy,创建一个 Engine 对象,如下所示:
from sqlalchemy import create_engine
engine = create_engine("postgresql://localhost/mydb")
如果 engine
的参数中指定的数据库(在本例中为
create_engine
)不存在,则访问
mydb
会失败。如果指定的数据库不存在,是否可以告诉 SQLAlchemy 创建一个新数据库?
SQLAlchemy-Utils为SQLAlchemy提供自定义数据类型和各种实用函数。您可以使用 pip 安装最新的官方版本:
pip install sqlalchemy-utils
数据库助手包含一个create_database
函数:
from sqlalchemy import create_engine
from sqlalchemy_utils import database_exists, create_database
engine = create_engine("postgres://localhost/mydb")
if not database_exists(engine.url):
create_database(engine.url)
print(database_exists(engine.url))
postgres
角色),则可以连接到
postgres
或
template1
数据库。默认的 pg_hba.conf 只允许名为
postgres
的 unix 用户使用
postgres
角色,所以最简单的事情就是成为该用户。无论如何,请像往常一样使用有权创建数据库的用户创建引擎:
>>> engine = sqlalchemy.create_engine("postgres://postgres@/postgres")
但是,您不能使用
engine.execute()
,因为 postgres 不允许您在事务内创建数据库,而 sqlalchemy 总是尝试在事务中运行查询。要解决这个问题,请从引擎获取底层连接:
>>> conn = engine.connect()
但连接仍将在事务内,因此您必须使用
commit
:结束打开的事务
>>> conn.execute("commit")
然后您可以继续使用正确的 PostgreSQL 命令创建数据库。
>>> conn.execute("create database test")
>>> conn.close()
isolation_level='AUTOCOMMIT'
到
create_engine
功能,可以在创建数据库时避免手动事务管理:
import sqlalchemy
with sqlalchemy.create_engine(
'postgresql:///postgres',
isolation_level='AUTOCOMMIT'
).connect() as connection:
connection.execute('CREATE DATABASE my_database')
此外,如果您不确定数据库不存在,有一种方法可以通过抑制
sqlalchemy.exc.ProgrammingError
异常来忽略由于存在而导致的数据库创建错误:
import contextlib
import sqlalchemy.exc
with contextlib.suppress(sqlalchemy.exc.ProgrammingError):
# creating database as above
with
扩展接受的答案会产生:
from sqlalchemy import create_engine
engine = create_engine("postgresql://localhost")
NEW_DB_NAME = 'database_name'
with engine.connect() as conn:
conn.execute("commit")
# Do not substitute user-supplied database names here.
conn.execute(f"CREATE DATABASE {NEW_DB_NAME}")
database_exists
获得上述建议,因为每当我使用 if not
database_exists(engine.url):
检查数据库是否存在时,我都会收到此错误:
InterfaceError('(pyodbc.InterfaceError) (\'28000\', u\'[28000] [Microsoft][SQL Server Native Client 11.0][SQL Server]登录失败 用户“myUser”。 (18456) (SQLDriverConnect); [28000] [Microsoft][SQL Server Native Client 11.0][SQL Server]无法打开 登录请求的数据库“MY_DATABASE”。登录失败。 (4060); [28000] [微软][SQL Server Native Client 11.0][SQL 服务器]用户“myUser”登录失败。 (18456); [28000] [Microsoft][SQL Server Native Client 11.0][SQL Server]无法打开 登录请求的数据库“MY_DATABASE”。登录失败。 (4060)\')',)另外
contextlib/suppress
不起作用,而且我没有使用
postgres
所以我最终这样做是为了忽略该异常(如果数据库恰好已经存在于 SQL Server 中):
import logging
import sqlalchemy
logging.basicConfig(filename='app.log', format='%(asctime)s-%(levelname)s-%(message)s', level=logging.DEBUG)
engine = create_engine('mssql+pyodbc://myUser:mypwd@localhost:1234/MY_DATABASE?driver=SQL+Server+Native+Client+11.0?trusted_connection=yes', isolation_level = "AUTOCOMMIT")
try:
engine.execute('CREATE DATABASE ' + a_database_name)
except Exception as db_exc:
logging.exception("Exception creating database: " + str(db_exc))
SingleNegationElimination 的答案来接受它。我在这里使用 pydantic(它是 FastAPI 项目)和我导入的设置供参考,但您可以轻松更改此设置:
from sqlalchemy import create_engine
from sqlalchemy.exc import OperationalError
from pydantic import PostgresDsn
from src.conf import settings
def build_db_connection_url(custom_db: Optional[str] = None):
db_name = f"/{settings.POSTGRES_DB or ''}" if custom_db is None else "/" + custom_db
return PostgresDsn.build(
scheme='postgresql+psycopg2',
user=settings.POSTGRES_USER,
password=settings.POSTGRES_PASSWORD,
host=settings.POSTGRES_HOST,
path=db_name,
)
def create_database(db_name: str):
try:
eng = create_engine(build_db_connection_url(custom_db=db_name))
conn = eng.connect()
conn.close()
except OperationalError as exc:
if "does not exist" in exc.__str__():
eng = create_engine(build_db_connection_url(custom_db="postgres"))
conn = eng.connect()
conn.execute("commit")
conn.execute(f"create database {db_name}")
conn.close()
print(f"Database {db_name} created")
else:
raise exc
eng.dispose()
create_database("test_database")
base_url = 'postgresql://postgres:admin@postgres:5432'
engine = create_engine(base_url)
# Create database if not exists
dbname = f"{namespace}_{name}"
with engine.connect() as conn:
conn.execute(text(f"SELECT 'CREATE DATABASE {dbname}' WHERE NOT EXISTS (SELECT FROM pg_database WHERE datname = '{dbname}')"))