我是 flask 和 sqlalchemy 的新手,正在尝试编写一个可以支持 400 个请求/秒的吞吐量的 API。
我正在使用 sqlalchemy 连接到 clickhouse 数据库。
我的 sqlalchemy 设置是:
SQLALCHEMY_BINDS_OPTIONS = {
'db': {
'pool_size': 150,
'echo_pool': True,
'max_overflow': 15,
'pool_pre_ping': True,
}
}
我正在打印由 sqlalchemy 建立并通过在 logger.py 中执行此操作回收的无连接:
class ConnectionPoolHandler(logging.Handler):
def __init__(self):
super().__init__()
self.open_connections = 0
self.recycled_connections = 0
def emit(self, record):
if record.getMessage().startswith('Created new connection'):
self.open_connections += 1
print(
f'{record.getMessage()} (Open connections: {self.open_connections}, Recycled connections: {self.recycled_connections})'
)
elif record.getMessage().startswith('Recycling connection'):
self.recycled_connections += 1
# Set up SQLAlchemy connection pool logging
sqlalchemy_connection_pool_logger = logging.getLogger('sqlalchemy.pool')
sqlalchemy_connection_pool_logger.setLevel(logging.DEBUG)
handler = ConnectionPoolHandler() # Create an instance of ConnectionPoolHandler
sqlalchemy_connection_pool_logger.addHandler(handler)
sqlalchemy_connection_pool_logger.propagate = False # disable propagation of messages to parent logger
我使用 gunicorn 作为代理来处理对 flask API 的请求。 我正在使用 threadpooolexecutor 来并行执行查询。
def get_threaded_mock_response():
try:
with app.app_context():
with ThreadPoolExecutor(max_workers=4) as executor:
get_response_1_future = executor.submit(g1)
get_response_2_future = executor.submit(g2)
get_response_3_future = executor.submit(g3)
get_response_4_future = executor.submit(g4)
response_1 = get_response_1_future.result()
response_2 = get_response_2_future.result()
response_3 = get_response_3_future.result()
response_4 = get_response_4_future.result()
response = []
response.append(response_1)
response.append(response_2)
response.append(response_3)
response.append(response_4)
response_data = jsonify(response)
return response_data
except Exception as e:
log_app(e, logging.ERROR)
return {"message": internal_error_message}, 500
def g1():
time.sleep(0.2)
return {
"g1": 1
}
def g2():
time.sleep(0.2)
return {
"g2": 2
}
def g3():
time.sleep(0.2)
return {
"g3": 3
}
def g4():
time.sleep(0.2)
return {
"g4": 4
}
我有像上面一样并行运行查询的函数。 假设每个查询需要 250 毫秒来执行。
我期待 1 个连接每秒可以处理 4 个查询。 因此每个连接/秒 1 个请求 这意味着我允许的最大池大小为 150
sqlalchemy 应该支持 150 个请求/秒。
但我的观察是:
我已经独立测试了 flask API 和 CH:
我在这里可以做什么来提高吞吐量。 考虑到我的烧瓶应用程序甚至没有使用 25% 的分配资源,CH db 也没有。