我的 Falcon + Gunicorn 后端访问一个关系数据库,该数据库保存形成图形的数据 - 一个带有节点的表和一个带有边的表。由于图不大(几百个节点和边),我在启动时将数据加载到 Networkx
DiGraph
中并将其保存在内存中。因此,当我请求图表数据时
问题 1: 这是一个好的做法吗?这种方法是否有任何(原则上)缺点(超出了我下面的问题)?
虽然图表不是高度动态的——没有“普通”API 端点编辑图表——但它可能会随着时间的推移在数据库中增长。因此,我创建了一个额外的端点
/graph/reload
,在调用时只需将图形重新加载到内存中(即 NetworkX DiGraph
对象)。因此,每当数据库确实发生更改时,我都会调用此端点。
原则上,这似乎有效。然而,现在的问题是当我使用 Gunicorn 和多个工作人员运行服务器时。当我调用
/graph/reload
时,这仅针对一个工作实例完成;其他人仍然使用旧图。
问题2:处理这个问题的最佳方法是什么?
当然,我始终可以针对每个请求访问数据库。我仍然很好奇是否可以通过将图形保留在内存中来完成
如果您能够负担得起为每个工作人员在内存中保留多个数据库副本,我认为您的方法并非不合理。
如果您希望继续与开始时相同的模式,您可以使用 stdlib 的
multiprocessing
原语(例如 multiprocessing.Condition
:)将重新加载事件传达给所有工作人员
我发现使用自定义 Gunicorn 应用程序更容易控制工作人员的分叉和初始化;您的用例的完整代码示例可能如下所示:
import datetime
import json
import logging
import multiprocessing
import os
import pathlib
import random
import threading
import falcon
import gunicorn.app.base
logging.basicConfig(
format='%(asctime)s [%(levelname)s] %(message)s', level=logging.INFO)
DATA_SOURCE = pathlib.Path(__file__).parent / 'data.json'
class Database:
def __init__(self):
# TODO: Fake data, actually load external resource.
now = datetime.datetime.now(datetime.timezone.utc)
self._data = {
'created': now.isoformat(),
'numbers': random.sample(range(100, 1000), 16),
}
self._cv = multiprocessing.Condition()
with open(DATA_SOURCE, 'w') as fp:
json.dump(self._data, fp, indent=4)
def _reload(self):
with open(DATA_SOURCE, 'r') as fp:
self._data = json.load(fp)
logging.info(f'Reloaded database (PID: {os.getpid()})')
def start_reload_loop(self):
def reload_loop():
while True:
with self._cv:
if self._cv.wait(timeout=3.0):
self._reload()
thread = threading.Thread(target=reload_loop, daemon=True)
thread.start()
logging.info(f'Starting reload thread... (PID: {os.getpid()})')
def on_get(self, req, resp):
resp.media = self._data
def on_post_reload(self, req, resp):
with self._cv:
self._cv.notify_all()
resp.status = falcon.HTTP_ACCEPTED
class PIDInfo:
def process_response(self, req, resp, resource, req_succeeded):
resp.set_header('X-Gunicorn-Worker', str(os.getpid()))
class ServerApplication(gunicorn.app.base.BaseApplication):
db = Database()
@classmethod
def post_fork(cls, arbiter, worker):
cls.db.start_reload_loop()
def __init__(self):
self.options = {
'accesslog': '-',
'bind': '127.0.0.1:8000',
'post_fork': self.post_fork,
'workers': 8,
}
self.application = app = falcon.App(middleware=[PIDInfo()])
app.add_route('/database', self.db)
app.add_route('/database/reload', self.db, suffix='reload')
super().__init__()
def load_config(self):
config = {key: value for key, value in self.options.items()
if key in self.cfg.settings and value is not None}
for key, value in config.items():
self.cfg.set(key.lower(), value)
def load(self):
return self.application
if __name__ == '__main__':
ServerApplication().run()
启动服务器应用程序后,您可以通过向 http://localhost:8000/database 发送 HTTP
GET
请求来访问我们的假数据库。假数据库是由同一目录中的 data.json
文件模拟的,该文件在启动时会被覆盖。
编辑文件,并对 http://localhost:8000/database/reload 执行 HTTP
POST
请求。您应该在后续 GET
请求中看到更新的数据。
另一种方法是以某种方式将内存数据库存储在
mmap
内存中,该内存在分叉工作人员之前初始化。
如果您想要更简单的代码来避免进程间同步,您还可以将数据缓存在内存中一段时间,并且如果需要,可以有一个单独的事实来源来指示重新加载(即清除缓存而不是使用它)。