Cloud Function 无法一次多次调用 PostgreSQL,除非先运行测试查询

问题描述 投票:0回答:1

我要疯了。

这是我的完整内容

main.py
。它可以通过
functions-framework --target=api
在本地运行,也可以直接在 Google Cloud 上运行:

import functions_framework
import sqlalchemy
import threading
from google.cloud.sql.connector import Connector, IPTypes
from sqlalchemy.orm import sessionmaker, scoped_session

Base = sqlalchemy.orm.declarative_base()

class TestUsers(Base):
    __tablename__ = 'TestUsers'
    
    uuid = sqlalchemy.Column(sqlalchemy.String, primary_key=True)

cloud_sql_connection_name = "myproject-123456:asia-northeast3:tosmedb"

# Google Cloud SQL connector
connector = Connector()

def getconn():
    connection = connector.connect(
        cloud_sql_connection_name,
        "pg8000",
        user="postgres",
        password="redacted",
        db="tosme",
        ip_type=IPTypes.PUBLIC,
    )
    return connection

def init_pool():
    engine_url = sqlalchemy.engine.url.URL.create(
        "postgresql+pg8000",
        username="postgres",
        password="redacted",
        host=cloud_sql_connection_name,
        database="tosme"
    )
    engine = sqlalchemy.create_engine(engine_url, creator=getconn)

    # Create tables if they don't exist
    Base.metadata.create_all(engine)
    return engine

engine = init_pool()

# Prepare a thread-safe Session maker
Session = scoped_session(sessionmaker(bind=engine))

print("Database initialized")

def run_concurrency_test():
    def get_user():
      with Session() as session:
          session.query(TestUsers).first()

    print("Simulating concurrent reads...")

    threads = []
    for i in range(2):
        thread = threading.Thread(target=get_user)
        threads.append(thread)
        thread.start()

    # Wait for all threads to complete
    for thread in threads:
        thread.join()
        print(f"Thread {thread.name} completed")

    print("Test passed - Threads all completed!\n")

run_concurrency_test()

@functions_framework.http
def api(request):
    print("API hit - Calling run_concurrency_test()...")
    run_concurrency_test()
    return "Success"

requirements.txt

functions-framework==3.*
cloud-sql-python-connector[pg8000]==1.5.*
SQLAlchemy==2.*
pg8000==1.*

非常简单 - 而且有效!只要你有一个 PostgreSQL 实例,它就会根据需要创建 TestUsers 表,查询两次(同时通过线程!),每次你curl它时,它都会工作。这是一些示例输出:

Database initialized
Simulating concurrent reads...
Thread Thread-4 (get_user) completed
Thread Thread-5 (get_user) completed
Test passed - Threads all completed!

API hit - Calling run_concurrency_test()...
Simulating concurrent reads...
Thread Thread-7 (get_user) completed
Thread Thread-8 (get_user) completed
Test passed - Threads all completed!

但是,如果我注释掉对

run_concurrency_test()
的第一个调用(即不在
api(request)
内的调用),运行它并卷曲,我得到:

Database initialized
API hit - Calling run_concurrency_test()...
Simulating concurrent reads...
Thread Thread-4 (get_user) completed

卡住了!具体来说,它卡在

session.query(TestUsers).first()
处。我先在
api()
之外运行并发测试的时候并没有卡住。据我所知,我的代码是无状态的,并且线程安全的。那么这里发生了什么让它突然不起作用?

python flask sqlalchemy google-cloud-functions google-cloud-sql
1个回答
0
投票

我也可能会画一个更大的错误(未定义的行为?),但如果没有其他人回答,请尝试这个:

改变

# Google Cloud SQL connector
connector = Connector()

def getconn():
    connection = connector.connect(
        cloud_sql_connection_name,
        "pg8000",
        user="postgres",
        password="redacted",
        db="tosme",
        ip_type=IPTypes.PUBLIC,
    )
    return connection

def getconn():
    # Google Cloud SQL connector
    connector = Connector()

    connection = connector.connect(
        cloud_sql_connection_name,
        "pg8000",
        user="postgres",
        password="redacted",
        db="tosme",
        ip_type=IPTypes.PUBLIC,
    )
    return connection

基本上,如果你仔细观察代码,

getconn
create_engine
的参数 - 它不是直接调用的。通过一些日志记录,我发现每次使用 getconn
 时实际上都会调用 
Session()

但是

getconn
每次都使用相同的
Connector()
对象。您从多个线程调用
Session()
,不断覆盖作为某些内部状态引用的全局对象,因此您发现奇怪的行为并不奇怪。

© www.soinside.com 2019 - 2024. All rights reserved.