我有一个芹菜任务,触发了它下面的一些子任务。它运行那些非同步的子任务。
@shared_task
def task_update_all_customers(customer_ids=()):
job = group(
task_update_customer.subtask((cust_id, ))
for cust_id in customer_ids
)
result = job.apply()
all_completed = result.ready()
return all_completed
@shared_task
def task_update_customer(customer_id):
with session_scope() as session:
num = (
session.query(SomeModel)
.filter(SomeModel.customer_id == customer_id)
.statement.with_only_columns([func.count()]).order_by(None).scalar()
)
print(num)
由于某些原因,当子任务def task_update_customer
执行时,它会在sqlalchemy查询上引发:
UnboundExecutionError`:此AnnotatedSelect对象未直接绑定到Connection或Engine。使用Connection或Engine的.execute()方法来执行此构造。
但是我正在使用session_scope
,而且我还对“task_update_customer”进行了完整的集成测试,该测试按预期工作。
这是session_scope
的实现:https://pastebin.com/p7DRvkFs
为什么会这样?
将我的查询更改为“修复”它
num = (
session.query(SomeModel)
.filter(SomeModel.customer_id == customer_id)
.count()
)