我要疯了。
这是我的完整内容
main.py
。它可以通过 functions-framework --target=api
在本地运行,也可以直接在 Google Cloud 上运行:
import functions_framework
import sqlalchemy
import threading
from google.cloud.sql.connector import Connector, IPTypes
from sqlalchemy.orm import sessionmaker, scoped_session
Base = sqlalchemy.orm.declarative_base()
class TestUsers(Base):
__tablename__ = 'TestUsers'
uuid = sqlalchemy.Column(sqlalchemy.String, primary_key=True)
cloud_sql_connection_name = "myproject-123456:asia-northeast3:tosmedb"
# Google Cloud SQL connector
connector = Connector()
def getconn():
connection = connector.connect(
cloud_sql_connection_name,
"pg8000",
user="postgres",
password="redacted",
db="tosme",
ip_type=IPTypes.PUBLIC,
)
return connection
def init_pool():
engine_url = sqlalchemy.engine.url.URL.create(
"postgresql+pg8000",
username="postgres",
password="redacted",
host=cloud_sql_connection_name,
database="tosme"
)
engine = sqlalchemy.create_engine(engine_url, creator=getconn)
# Create tables if they don't exist
Base.metadata.create_all(engine)
return engine
engine = init_pool()
# Prepare a thread-safe Session maker
Session = scoped_session(sessionmaker(bind=engine))
print("Database initialized")
def run_concurrency_test():
def get_user():
with Session() as session:
session.query(TestUsers).first()
print("Simulating concurrent reads...")
threads = []
for i in range(2):
thread = threading.Thread(target=get_user)
threads.append(thread)
thread.start()
# Wait for all threads to complete
for thread in threads:
thread.join()
print(f"Thread {thread.name} completed")
print("Test passed - Threads all completed!\n")
run_concurrency_test()
@functions_framework.http
def api(request):
print("API hit - Calling run_concurrency_test()...")
run_concurrency_test()
return "Success"
requirements.txt
:
functions-framework==3.*
cloud-sql-python-connector[pg8000]==1.5.*
SQLAlchemy==2.*
pg8000==1.*
非常简单 - 而且有效!只要你有一个 PostgreSQL 实例,它就会根据需要创建 TestUsers 表,查询两次(同时通过线程!),每次你curl它时,它都会工作。这是一些示例输出:
Database initialized
Simulating concurrent reads...
Thread Thread-4 (get_user) completed
Thread Thread-5 (get_user) completed
Test passed - Threads all completed!
API hit - Calling run_concurrency_test()...
Simulating concurrent reads...
Thread Thread-7 (get_user) completed
Thread Thread-8 (get_user) completed
Test passed - Threads all completed!
但是,如果我注释掉对
run_concurrency_test()
的第一个调用(即不在 api(request)
内的调用),运行它并卷曲,我得到:
Database initialized
API hit - Calling run_concurrency_test()...
Simulating concurrent reads...
Thread Thread-4 (get_user) completed
卡住了!具体来说,它卡在
session.query(TestUsers).first()
处。我先在api()
之外运行并发测试的时候并没有卡住。据我所知,我的代码是无状态的,并且线程安全的。那么这里发生了什么让它突然不起作用?
我也可能会画一个更大的错误(未定义的行为?),但如果没有其他人回答,请尝试这个:
改变
# Google Cloud SQL connector
connector = Connector()
def getconn():
connection = connector.connect(
cloud_sql_connection_name,
"pg8000",
user="postgres",
password="redacted",
db="tosme",
ip_type=IPTypes.PUBLIC,
)
return connection
至
def getconn():
# Google Cloud SQL connector
connector = Connector()
connection = connector.connect(
cloud_sql_connection_name,
"pg8000",
user="postgres",
password="redacted",
db="tosme",
ip_type=IPTypes.PUBLIC,
)
return connection
基本上,如果你仔细观察代码,
getconn
是create_engine
的参数 - 它不是直接调用的。通过一些日志记录,我发现每次使用 getconn
时实际上都会调用 Session()
。
但是
getconn
每次都使用相同的 Connector()
对象。您从多个线程调用 Session()
,不断覆盖作为某些内部状态引用的全局对象,因此您发现奇怪的行为并不奇怪。