Gunicorn 具有多种作品但共享内存

问题描述 投票:0回答:1

我的 Falcon + Gunicorn 后端访问一个关系数据库,该数据库保存形成图形的数据 - 一个带有节点的表和一个带有边的表。由于图不大(几百个节点和边),我在启动时将数据加载到 Networkx

DiGraph
中并将其保存在内存中。因此,当我请求图表数据时

  • 不需要持续访问数据库,这听起来不错。性能
  • 轻松使用NetworkX提供的内置方法来提取不同类型的子图

问题 1: 这是一个好的做法吗?这种方法是否有任何(原则上)缺点(超出了我下面的问题)?

虽然图表不是高度动态的——没有“普通”API 端点编辑图表——但它可能会随着时间的推移在数据库中增长。因此,我创建了一个额外的端点

/graph/reload
,在调用时只需将图形重新加载到内存中(即 NetworkX
DiGraph
对象)。因此,每当数据库确实发生更改时,我都会调用此端点。

原则上,这似乎有效。然而,现在的问题是当我使用 Gunicorn 和多个工作人员运行服务器时。当我调用

/graph/reload
时,这仅针对一个工作实例完成;其他人仍然使用旧图。

问题2:处理这个问题的最佳方法是什么?

当然,我始终可以针对每个请求访问数据库。我仍然很好奇是否可以通过将图形保留在内存中来完成

python networkx gunicorn falconframework
1个回答
0
投票

如果您能够负担得起为每个工作人员在内存中保留多个数据库副本,我认为您的方法并非不合理。

如果您希望继续与开始时相同的模式,您可以使用 stdlib 的

multiprocessing
原语(例如
multiprocessing.Condition

)将重新加载事件传达给所有工作人员
  1. 在分叉工作进程之前初始化数据库。
  2. 在分叉工作进程之前创建共享的条件变量
  3. 分叉工作进程后,在每个进程中启动一个线程,通过进程之间共享的条件变量监听更改。
  4. 收到重新加载请求后,从每个工作线程执行重新加载。

我发现使用自定义 Gunicorn 应用程序更容易控制工作人员的分叉和初始化;您的用例的完整代码示例可能如下所示:

import datetime
import json
import logging
import multiprocessing
import os
import pathlib
import random
import threading

import falcon
import gunicorn.app.base

logging.basicConfig(
    format='%(asctime)s [%(levelname)s] %(message)s', level=logging.INFO)

DATA_SOURCE = pathlib.Path(__file__).parent / 'data.json'


class Database:
    def __init__(self):
        # TODO: Fake data, actually load external resource.
        now = datetime.datetime.now(datetime.timezone.utc)
        self._data = {
            'created': now.isoformat(),
            'numbers': random.sample(range(100, 1000), 16),
        }
        self._cv = multiprocessing.Condition()

        with open(DATA_SOURCE, 'w') as fp:
            json.dump(self._data, fp, indent=4)

    def _reload(self):
        with open(DATA_SOURCE, 'r') as fp:
            self._data = json.load(fp)
        logging.info(f'Reloaded database (PID: {os.getpid()})')

    def start_reload_loop(self):
        def reload_loop():
            while True:
                with self._cv:
                    if self._cv.wait(timeout=3.0):
                        self._reload()

        thread = threading.Thread(target=reload_loop, daemon=True)
        thread.start()
        logging.info(f'Starting reload thread... (PID: {os.getpid()})')

    def on_get(self, req, resp):
        resp.media = self._data

    def on_post_reload(self, req, resp):
        with self._cv:
            self._cv.notify_all()
        resp.status = falcon.HTTP_ACCEPTED


class PIDInfo:
    def process_response(self, req, resp, resource, req_succeeded):
        resp.set_header('X-Gunicorn-Worker', str(os.getpid()))


class ServerApplication(gunicorn.app.base.BaseApplication):
    db = Database()

    @classmethod
    def post_fork(cls, arbiter, worker):
        cls.db.start_reload_loop()

    def __init__(self):
        self.options = {
            'accesslog': '-',
            'bind': '127.0.0.1:8000',
            'post_fork': self.post_fork,
            'workers': 8,
        }

        self.application = app = falcon.App(middleware=[PIDInfo()])
        app.add_route('/database', self.db)
        app.add_route('/database/reload', self.db, suffix='reload')

        super().__init__()

    def load_config(self):
        config = {key: value for key, value in self.options.items()
                  if key in self.cfg.settings and value is not None}
        for key, value in config.items():
            self.cfg.set(key.lower(), value)

    def load(self):
        return self.application


if __name__ == '__main__':
    ServerApplication().run()

启动服务器应用程序后,您可以通过向 http://localhost:8000/database 发送 HTTP

GET
请求来访问我们的假数据库。假数据库是由同一目录中的
data.json
文件模拟的,该文件在启动时会被覆盖。

编辑文件,并对 http://localhost:8000/database/reload 执行 HTTP

POST
请求。您应该在后续
GET
请求中看到更新的数据。

另一种方法是以某种方式将内存数据库存储在

mmap
内存中,该内存在分叉工作人员之前初始化。

如果您想要更简单的代码来避免进程间同步,您还可以将数据缓存在内存中一段时间,并且如果需要,可以有一个单独的事实来源来指示重新加载(即清除缓存而不是使用它)。

© www.soinside.com 2019 - 2024. All rights reserved.