我正在尝试以编程方式构建搜索查询,为此,我要加入一个表。
class User(db.Model):
id = db.Column(db.Integer(), primary_key=True)
class Tag(db.Model):
id = db.Column(db.Integer(), primary_key=True)
user_id = db.Column(db.Integer(), db.ForeignKey('user.id'))
title = db.Column(db.String(128))
description = db.Column(db.String(128))
这是一个有点人为的例子 - 我希望它有意义。
假设我的搜索功能类似于:
def search(title_arg, desc_arg):
query = User.query
if title_arg:
query = query.join(Tag)
query = query.filter(Tag.title.contains(title_arg))
if desc_arg:
query = query.join(Tag)
query = query.filter(Tag.description.contains(desc_arg))
return query
之前,我跟踪了哪些表已经加入到列表中,如果该表在列表中,则假设它已经加入,然后添加过滤器。
如果我可以查看查询对象,看到
Tag
已经加入,如果是的话就跳过它,那就太酷了。我有一些更复杂的查询构建,它们确实会从中受益。
如果有一个完全不同的策略来为我错过的搜索构建查询,那也很棒。或者,如果我两次加入表,上面的代码就可以了,那也是很好的信息。非常感谢任何帮助!!!
您可以在
query._join_entities
中找到连接表
joined_tables = [mapper.class_ for mapper in query._join_entities]
自 SQLAlchemy 1.4 起,早期提出的解决方案(包括
_join_entities
)不再起作用。
我尝试在 SQLAlchemy 1.4 中解决这个问题,但有一个警告:
from sqlalchemy.sql import visitors
from contextlib import suppress
def _has_entity(self, model) -> bool:
for visitor in visitors.iterate(self.statement):
# Checking for `.join(Parent.child)` clauses
if visitor.__visit_name__ == 'binary':
for vis in visitors.iterate(visitor):
# Visitor might not have table attribute
with suppress(AttributeError):
# Verify if already present based on table name
if model.__table__.fullname == vis.table.fullname:
return True
# Checking for `.join(Child)` clauses
if visitor.__visit_name__ == 'table':
# Visitor might be of ColumnCollection or so,
# which cannot be compared to model
with suppress(TypeError):
if model == visitor.entity_namespace:
return True
# Checking for `Model.column` clauses
if visitor.__visit_name__ == "column":
with suppress(AttributeError):
if model.__table__.fullname == visitor.table.fullname:
return True
return False
def unique_join(self, model, *args, **kwargs):
"""Join if given model not yet in query"""
if not self._has_entity(model):
self = self.join(model, *args, **kwargs)
return self
Query._has_entity = _has_entity
Query.unique_join = unique_join
对于 SQLAlchemy 1.3 及之前的版本,@mtoloo 和 @r-m-n 有完美的答案,为了完整起见,我将它们包括在内。
在项目初始化的某些地方,向 sqlalchemy.orm.Query 对象添加 unique_join 方法,如下所示:
def unique_join(self, *props, **kwargs):
if props[0] in [c.entity for c in self._join_entities]:
return self
return self.join(*props, **kwargs)
现在使用query.unique_join代替query.join:
Query.unique_join = unique_join
根据r-m-n答案:
在项目初始化的某些地方,向
unique_join
对象添加 sqlalchemy.orm.Query
方法,如下所示:
def unique_join(self, *props, **kwargs):
if props[0] in [c.entity for c in self._join_entities]:
return self
return self.join(*props, **kwargs)
Query.unique_join = unique_join
现在使用
query.unique_join
代替 query.join
:
query = query.unique_join(Tag)
您可以通过
statement._setup_joins
检查SQLAlchemy 2.0中的连接表。也许在 1.4 中也可以,但还没有检查过。
Sqlalchemy 2.0版本
from sqlalchemy.orm import Query
from sqlalchemy.sql import coercions, roles
from sqlalchemy.sql._typing import _JoinTargetArgument, _OnClauseArgument
def idempotent_join(
query: Query,
target: _JoinTargetArgument,
onclause: _OnClauseArgument | None = None,
*,
isouter: bool = False,
full: bool = False,
) -> Query:
"""Idempotent version of Query.join method"""
join_target = coercions.expect(roles.JoinTargetRole, target, apply_propagate_attrs=query, legacy=True)
if onclause is not None:
onclause_element = coercions.expect(roles.OnClauseRole, onclause, legacy=True)
else:
onclause_element = None
setup_entry = (
join_target,
onclause_element,
None,
{
"isouter": isouter,
"full": full,
},
)
if setup_entry not in query._setup_joins:
query._setup_joins += (setup_entry,)
query.__dict__.pop("_last_joined_entity", None)
return query
用途:
def search(title_arg, desc_arg):
query = User.query
if title_arg:
query = idempotent_join(query, Tag)
query = query.filter(Tag.title.contains(title_arg))
if desc_arg:
query = idempotent_join(query, Tag)
query = query.filter(Tag.description.contains(desc_arg))
return query