我有一个 psycopg
ConnectionPool
可以扩展 Flask 应用程序:
def init_pool(cfg, connection_class, name) -> ConnectionPool:
pool = ConnectionPool(
conninfo=f"postgresql://{cfg['user']}:{cfg['password']}@{cfg['host']}/{cfg['database']}",
min_size=cfg['min_pool_size'],
max_size=cfg['max_pool_size'],
connection_class=connection_class,
kwargs={"autocommit": True, "options": "-c idle_session_timeout=0"},
name=name
)
pool.wait(timeout=30.0)
with pool.connection() as conn:
# retrieve a connection to register the user-defined types
# in the global scope of the application (*default*)
register_composite_types(conn)
return pool
def register_composite_types(conn):
""" Convert postgresql type -> python
* Register the shared composite types in the conn scope.
* Utilizes the built-in psycopg factory call
"""
with conn.cursor() as cur:
for (t,) in cur.execute("select shared_types()"):
if not conn.broken:
info = CompositeInfo.fetch(conn, t)
register_composite(info, context=None)
else:
raise LevelsDbException(project_id=None, filename=None,
message="Failed to initialize: Broken connection")
我正在使用一个自定义任务类来注入 Flask 应用程序上下文(根据 flask+celery 的最新文档):
def celery_init(app: Flask) -> Celery:
class FlaskTask(Task):
def __call__(self, *args: object, **kwargs: object) -> object:
with app.app_context():
return self.run(*args, **kwargs)
# instantiated once for each task. Each task serves multiple task requests.
celery_app = Celery(app.name, task_cls=FlaskTask)
celery_app.config_from_object(celeryconfig)
celery_app.set_default()
app.extensions["celery"] = celery_app
return celery_app
我有一个
shared_task
通过 Flask 代理使用池,current_app
:
with current_app.inspection_db_pool.connection() as conn:
field_data = db.get_file_fields( conn
, project_id
, path
, hashstr
)
# field_data (a Cursor returned from `conn.execute`)
fields: list[dict] = fields_from_levels_db(field_data)
# ...
def fields_from_levels_db(file_fields: Iterable) -> list[dict]:
""" file_fields is a Cursor """
return [ dict(
idx = field.field_idx,
purpose = field.purpose,
...
levels = field.levels
) for (field,) in file_fields ]
shared_task
功能在 Flask 应用程序中按预期工作。但是,作为使用 FlaskTask
配置的芹菜工作人员,我收到以下错误(field_idx
应该是字典键):
AttributeError: 'str' object has no attribute 'field_idx'
我的故障排除工作包括通过
self.flask_app.inspection_pool
引用池。我也遇到同样的错误。
一般来说,当 postgresql 用户类型未在当前范围内注册和/或可用时,就会发生此类错误。我是否在相关范围内正确注册了用户类型?
依赖全局范围来维护初始化的适配器在 celery 上下文中失败。
为了确保池中的每个连接都有对适配器的有效引用,我使用了
ConnectionPool
configure
设置。该设置是一个回调,它会在连接在池中可用之前改变连接。回调包括使用 psycopg.types.composite.register_composite
调用 context=conn
。
def init_inspection_pool(cfg) -> ConnectionPool:
def configure(conn: psycopg.Connection):
register_composite_types(conn, context=conn)
with_features(conn)
return init_pool(cfg, InspectionConnection, 'inspection_pool',
configure=configure)
另一种看起来很有前途的方法是利用继承自
Connection
的可选类。这对于这个目的来说是一个死胡同。我包含一个自定义类只是为了将池与应用程序中的其他池区分开来。