psycopg 停止在 celery 应用程序上下文中反序列化

问题描述 投票:0回答:1

我有一个 psycopg

ConnectionPool
可以扩展 Flask 应用程序:

def init_pool(cfg, connection_class, name) -> ConnectionPool:

    pool = ConnectionPool(
        conninfo=f"postgresql://{cfg['user']}:{cfg['password']}@{cfg['host']}/{cfg['database']}",
        min_size=cfg['min_pool_size'],
        max_size=cfg['max_pool_size'],
        connection_class=connection_class,
        kwargs={"autocommit": True, "options": "-c idle_session_timeout=0"},
        name=name
    )
    pool.wait(timeout=30.0)

    with pool.connection() as conn:
        # retrieve a connection to register the user-defined types
        # in the global scope of the application (*default*)
        register_composite_types(conn)


    return pool

def register_composite_types(conn):
    """ Convert postgresql type -> python
        * Register the shared composite types in the conn scope.
        * Utilizes the built-in psycopg factory call
    """
    with conn.cursor() as cur:
        for (t,) in cur.execute("select shared_types()"):
            if not conn.broken:
                info = CompositeInfo.fetch(conn, t)
                register_composite(info, context=None)
            else:
                raise LevelsDbException(project_id=None, filename=None,
                                        message="Failed to initialize: Broken connection")


我正在使用一个自定义任务类来注入 Flask 应用程序上下文(根据 flask+celery 的最新文档):

def celery_init(app: Flask) -> Celery:

    class FlaskTask(Task):
        def __call__(self, *args: object, **kwargs: object) -> object:
            with app.app_context():
                return self.run(*args, **kwargs)

    # instantiated once for each task.  Each task serves multiple task requests.
    celery_app = Celery(app.name, task_cls=FlaskTask)
    celery_app.config_from_object(celeryconfig)
    celery_app.set_default()
    app.extensions["celery"] = celery_app

    return celery_app

我有一个

shared_task
通过 Flask 代理使用池,
current_app
:

with current_app.inspection_db_pool.connection() as conn:
    field_data = db.get_file_fields( conn
                                   , project_id
                                   , path
                                   , hashstr
                                   )

# field_data (a Cursor returned from `conn.execute`)
fields: list[dict] = fields_from_levels_db(field_data)

# ... 
def fields_from_levels_db(file_fields: Iterable) -> list[dict]:
    """ file_fields is a Cursor """
    return [ dict(
        idx = field.field_idx,
        purpose = field.purpose,
        ...
        levels = field.levels

    ) for (field,) in file_fields ]

shared_task
功能在 Flask 应用程序中按预期工作。但是,作为使用
FlaskTask
配置的芹菜工作人员,我收到以下错误(
field_idx
应该是字典键):

AttributeError: 'str' object has no attribute 'field_idx'

我的故障排除工作包括通过

self.flask_app.inspection_pool
引用池。我也遇到同样的错误。

一般来说,当 postgresql 用户类型未在当前范围内注册和/或可用时,就会发生此类错误。我是否在相关范围内正确注册了用户类型?

python python-3.x flask celery-task psycopg3
1个回答
0
投票

依赖全局范围来维护初始化的适配器在 celery 上下文中失败。

为了确保池中的每个连接都有对适配器的有效引用,我使用了

ConnectionPool
configure
设置。该设置是一个回调,它会在连接在池中可用之前改变连接。回调包括使用
psycopg.types.composite.register_composite
调用
context=conn

def init_inspection_pool(cfg) -> ConnectionPool:
    def configure(conn: psycopg.Connection):
        register_composite_types(conn, context=conn)
        with_features(conn)

    return init_pool(cfg, InspectionConnection, 'inspection_pool',
                     configure=configure)

另一种看起来很有前途的方法是利用继承自

Connection
的可选类。这对于这个目的来说是一个死胡同。我包含一个自定义类只是为了将池与应用程序中的其他池区分开来。

© www.soinside.com 2019 - 2024. All rights reserved.