我用 Python 编写了第二代 Google Cloud Function,它从外部 API 提取数据,并将其写入 Google Cloud SQL 中托管的 PostgreSQL 数据库。当我尝试运行该函数时,它超时但没有更改数据库中的任何内容。我添加了一些 print() 语句来查看它失败的地方。 sqlalchemy“engine.create()”似乎没有问题,但似乎没有通过第一个 session.execute(stmt)。然而,使用函数框架在本地运行相同的函数(同时连接到云数据库)没有问题。我正在使用 Python cloud.sql.connector 库(带有 pg8000 驱动程序)连接到数据库。
老实说,我无法解释为什么这不起作用,或者问题可能是什么。我尝试使用具有全局所有者角色的功能的服务帐户;当我在本地以及另一个云功能中运行数据库凭据时,它可以工作;我尝试使用云 shell 中的
curl -H "Authorization: Bearer $(gcloud auth print-identity-token)" https://FUNCTION_URL
以及本地计算机中的 gcloud function call
运行该函数,结果相同。我希望该函数要么完成要么崩溃,但它只是坐在那里直到超时。我认为这个函数与连接到同一实例的另一个函数之间的唯一区别是,这个函数是第二代云函数,并使用连接器库而不是 unix 套接字来连接到数据库。相同项目、相同区域、相同数据库实例、相同 GCP 服务帐户、相同 SQL 登录。
相关的(我认为)代码;创建引擎的位:
connector = Connector()
def getconn() -> pg8000.dbapi.Connection:
conn: pg8000.dbapi.Connection = connector.connect(
instance_connection_name,
"pg8000",
user=username,
password=password,
db=database,
ip_type=IPTypes.PUBLIC,
)
return conn
print("creating db connection...")
# create a connection to the database
engine = create_engine("postgresql+pg8000://",
creator=getconn,
echo=False)
Session = sessionmaker(bind=engine)
session = Session()
print("db con ok")
无法完成的方法(永远不会打印“ok”):
def wipe_table(name):
print(f"Wiping staging table {name}...")
stmt = text(f"TRUNCATE {name};")
session.execute(stmt)
print("ok")
这很可能与Cloud Function 无法通过“上游请求超时”连接到 Cloud SQL 相同的问题(我建议使用完整的 Cloud Function 代码更新帖子以允许重现错误)。
TLDR; Cloud SQL Python
Connector
对象或 SQLAlchemy 连接池在与 Cloud Functions 一起使用时应延迟初始化,因为它运行后台任务。
事实证明,运行后台任务的全局变量在 Cloud Function 请求上下文之外运行时可能会导致问题(因为 Cloud Functions 仅在发出第一个请求时分配计算)。因此,Cloud Functions 建议延迟初始化这种类型的全局变量,以便在请求上下文中初始化该变量。
延迟初始化连接池示例如下:
import functions_framework
import sqlalchemy
from google.cloud.sql.connector import Connector, IPTypes
import pg8000
def connect_to_instance() -> sqlalchemy.engine.base.Engine:
connector = Connector()
def getconn() -> pg8000.dbapi.Connection:
return connector.connect(
"...", # the PostgreSQL's instance connection name here
"pg8000",
user = "xyz",
password = 'supersecret',
db = "db_name",
ip_type = IPTypes.PUBLIC
)
return sqlalchemy.create_engine(
"postgresql+pg8000://",
creator = getconn,
)
# lazy initialization of global db
db = None
@functions_framework.http
def hello_http(request):
# lazy init within request context
global db
if not db:
db = connect_to_instance()
with db.connect() as conn:
# ... run queries
有关同一问题的更多详细信息可以在这里找到:https://github.com/GoogleCloudPlatform/cloud-sql-python-connector/issues/830