使用sqlalchemy会话的Celery子任务会引发UnboundExecutionError

问题描述 投票:0回答:1

我有一个芹菜任务,触发了它下面的一些子任务。它运行那些非同步的子任务。

@shared_task
def task_update_all_customers(customer_ids=()):
    job = group(
        task_update_customer.subtask((cust_id, ))
        for cust_id in customer_ids
    )
    result = job.apply()
    all_completed = result.ready()
    return all_completed


@shared_task
def task_update_customer(customer_id):
    with session_scope() as session:
        num = (
            session.query(SomeModel)
            .filter(SomeModel.customer_id == customer_id)
            .statement.with_only_columns([func.count()]).order_by(None).scalar()
        )
        print(num)

由于某些原因,当子任务def task_update_customer执行时,它会在sqlalchemy查询上引发:

UnboundExecutionError`:此AnnotatedSelect对象未直接绑定到Connection或Engine。使用Connection或Engine的.execute()方法来执行此构造。

但是我正在使用session_scope,而且我还对“task_update_customer”进行了完整的集成测试,该测试按预期工作。

这是session_scope的实现:https://pastebin.com/p7DRvkFs

为什么会这样?

python sqlalchemy celery
1个回答
0
投票

将我的查询更改为“修复”它

    num = (
        session.query(SomeModel)
        .filter(SomeModel.customer_id == customer_id)
        .count()
    )
© www.soinside.com 2019 - 2024. All rights reserved.