SQLALCHEMY 没有利用整个连接池

问题描述 投票:0回答:0

我是 flask 和 sqlalchemy 的新手,正在尝试编写一个可以支持 400 个请求/秒的吞吐量的 API。

我正在使用 sqlalchemy 连接到 clickhouse 数据库。

我的 sqlalchemy 设置是:

 SQLALCHEMY_BINDS_OPTIONS = {
        'db': {
            'pool_size': 150,
            'echo_pool': True,
            'max_overflow': 15,
            'pool_pre_ping': True,

        }
    }

我正在打印由 sqlalchemy 建立并通过在 logger.py 中执行此操作回收的无连接:

class ConnectionPoolHandler(logging.Handler):
    def __init__(self):
        super().__init__()
        self.open_connections = 0
        self.recycled_connections = 0

    def emit(self, record):
        if record.getMessage().startswith('Created new connection'):
            self.open_connections += 1
            print(
                f'{record.getMessage()} (Open connections: {self.open_connections}, Recycled connections: {self.recycled_connections})'
            )
        elif record.getMessage().startswith('Recycling connection'):
            self.recycled_connections += 1

# Set up SQLAlchemy connection pool logging
sqlalchemy_connection_pool_logger = logging.getLogger('sqlalchemy.pool')
sqlalchemy_connection_pool_logger.setLevel(logging.DEBUG)
handler = ConnectionPoolHandler()  # Create an instance of ConnectionPoolHandler
sqlalchemy_connection_pool_logger.addHandler(handler)
sqlalchemy_connection_pool_logger.propagate = False  # disable propagation of messages to parent logger

我使用 gunicorn 作为代理来处理对 flask API 的请求。 我正在使用 threadpooolexecutor 来并行执行查询。

def get_threaded_mock_response():
    try:
        with app.app_context():
            with ThreadPoolExecutor(max_workers=4) as executor:
                get_response_1_future = executor.submit(g1)
                get_response_2_future = executor.submit(g2)
                get_response_3_future = executor.submit(g3)
                get_response_4_future = executor.submit(g4)

        response_1 = get_response_1_future.result()

        response_2 = get_response_2_future.result()

        response_3 = get_response_3_future.result()

        response_4 = get_response_4_future.result()

        response = []
        response.append(response_1)
        response.append(response_2)
        response.append(response_3)
        response.append(response_4)
        response_data = jsonify(response)
        return response_data
    except Exception as e:
        log_app(e, logging.ERROR)
        return {"message": internal_error_message}, 500


def g1():
    time.sleep(0.2)
    return {
        "g1": 1
    }


def g2():
    time.sleep(0.2)
    return {
        "g2": 2
    }


def g3():
    time.sleep(0.2)
    return {
        "g3": 3
    }


def g4():
    time.sleep(0.2)
    return {
        "g4": 4
    }

我有像上面一样并行运行查询的函数。 假设每个查询需要 250 毫秒来执行。

我期待 1 个连接每秒可以处理 4 个查询。 因此每个连接/秒 1 个请求 这意味着我允许的最大池大小为 150

sqlalchemy 应该支持 150 个请求/秒。

但我的观察是:

  1. 1 个工作线程和 1 个线程 - sqlalchemy 始终打开 4 个连接并且没有连接被回收。
  2. 16 个工人和 64 个线程 - 15 个连接是打开的和非回收的。
  3. ch db 指标显示在峰值时最多打开 40 个连接。

我已经独立测试了 flask API 和 CH:

  1. 通过点击上面的模拟端点并获得 400req/sec 吞吐量。
  2. my CH db 配置为有 500 个并行请求。
  3. 我做200个请求的测试,CH占用的cpu不超过25%。

我在这里可以做什么来提高吞吐量。 考虑到我的烧瓶应用程序甚至没有使用 25% 的分配资源,CH db 也没有。

python flask sqlalchemy gunicorn clickhouse
© www.soinside.com 2019 - 2024. All rights reserved.