这是负责利用 Google 连接器管理我们的数据库连接池的代码片段:
class DatabaseConnection:
_instance = None
pool: Engine = None
def __new__(cls):
if cls._instance is None:
cls._instance = super(DatabaseConnection, cls).__new__(cls)
return cls._instance
@classmethod
def initialize_pool(cls):
if cls.pool is None:
try:
settings = Settings()
# Initialize Cloud SQL Connector
connector = Connector()
def getconn() -> pg8000.dbapi.Connection:
# Use the connector for Cloud SQL
return connector.connect(
settings.DB_CONNECTION_NAME,
"pg8000",
user=settings.DB_USER,
password=settings.DB_PASSWORD,
db=settings.DB_NAME,
ip_type=IPTypes.PUBLIC,
)
# Use 'creator' to integrate Cloud SQL Connector with SQLAlchemy
cls.pool = create_engine(
"postgresql+pg8000://",
creator=getconn,
pool_size=10,
max_overflow=1,
pool_timeout=30,
pool_recycle=1080,
pool_pre_ping=True,
)
print("Database pool initialized")
except Exception as e:
print("Failed to initialize database pool")
traceback.print_exc()
@classmethod
def dispose_pool(cls):
if cls.pool is not None:
cls.pool.dispose()
cls.pool = None
print("Database pool disposed")
def get_connection(self):
if self.pool:
return self.pool.connect()
raise SQLAlchemyError("Database pool not initialized")
我的 FastAPI 代码在启动期间初始化池,并在每个路由中调用 get_connection() 方法:
@app.on_event("startup")
async def startup_event():
# Initialize the database connection pool
DatabaseConnection.initialize_pool()
@app.on_event("shutdown")
async def shutdown_event():
# Dispose of the database connection pool
DatabaseConnection.dispose_pool()
调用 get_connection() 的示例路由:
with DatabaseConnection().get_connection() as postgre_db_connection:
query_string = text(
"SELECT * FROM public.brand_data"
)
res = postgre_db_connection.execute(query_string)
query_result_single_row = res.fetchone()
尽管我尽了最大努力,但此错误的随机性使得一致重现变得困难,这反过来又使调试工作同样难以捉摸。
如果有人遇到类似问题或在这方面有专业知识,我们将非常感谢您的指导和建议。此外,如果您需要任何具体细节或日志来进一步帮助我,请告诉我。
谢谢
当我尝试连接 pg8000 驱动程序时,我遇到了同样的错误,因此我将驱动程序更改为 psycopg2 并使用了 UNIX 套接字,
import sqlalchemy
from sqlalchemy import URL
def connect_unix_socket() -> sqlalchemy.engine.base.Engine:
try:
pool = sqlalchemy.create_engine(
URL.create(
drivername="postgresql+psycopg2",
username=db_user,
password=db_pass,
database=db_name,
host=unix_socket_path,
),
)
except:
print('Something went wrong!')
return pool
unix 套接字路径看起来像这样 '/cloudsql/INSTANCE_CONNECTION_NAME'。您可以在 Google Cloud SQL 的数据库列表中找到您的 INSTANCE_CONNECTION_NAME(更多信息请参见此处与 Unix 套接字连接)
您还需要在 Google Cloud Run 服务中激活 SQL 连接。那么你就不需要公共IP了。
某些驱动程序需要在 unix_socket_path 末尾添加“.s.PGSQL.5432”,例如 pg8000,但 psycopg2 无需像我的示例中那样工作
我使用的requirements.txt文件
psycopg2-binary==2.9.9
SQLAlchemy==2.0.30