我正在尝试将这个原始 PostgreSQL 查询转换为 SQLAlchemy:
SELECT a.main.id, a.main.name, a.access.user_id
(select count(*) from a.access where a.access.main_id = a.main.id and a.access.is_admin IS FALSE) as count
FROM a.main
INNER JOIN a.access ON a.access.main_id = a.main.id
where a.access.user_id = 'myusername'
group by a.main.id, a.main.name, a.access.user_id
所以我在 SQLAlchemy 中所做的就是这个
subquery = session.query(
func.count().label("count")
).filter(
Access.main_id == Main.id,
Access.is_admin.is_(False),
).subquery()
stmt = session.query(
Main.id,
Main.name
Access.user_id
subquery.c.count
).join(
Access,
Access.main_id = Main.id
).filter(
Access.user_id = 'myusername'
).group_by(
Main.id,
Main.name,
Access.user_id,
subquery.c.count
).all()
但是在检查
subquery
时,它显示查询为:
SELECT count(*)
FROM a.access, a.main
WHERE a.access.main_id = a.main.id
这就是我陷入困境的地方。我尝试查看这个question,但这仅处理子查询位于 where 子句中的情况。
你的问题如何将原始sql转换为sqlalchemy显然太宽泛了,但我认为这会带来一个你可以使用或尝试的解决方案:
main_alias = aliased(Main)
access_alias = aliased(Access)
query = (
session.query(
Main.id,
Main.name,
Access.user_id,
session.query(func.count())
.filter(access_alias.main_id == main_alias.id, access_alias.is_admin.is_(False))
.correlate(main_alias)
.as_scalar()
.label("count"),
)
.join(access_alias, access_alias.main_id == Main.id)
.filter(Access.user_id == "myusername")
.group_by(Main.id, Main.name, Access.user_id)
)
# Execute the query and retrieve the results
results = query.all()
这里的想法是,它不使用显式子查询,而是使用相同的表作为别名,它连接并关联到该表,然后使用适当的过滤器选择计数(这将 where 子句放在它所属的位置)
我还认为按 subquery.c.count 分组是您原始查询中的错误?
还有一个使用 SQLAlchemy Core 的示例,它可能更接近您显式使用子查询的想法,但语法显然非常不同:
from sqlalchemy import select, func, and_
subquery = select([func.count()]).where(and_(a.access.c.main_id == a.main.c.id, a.access.c.is_admin == False)).label('count')
stmt = select([a.main.c.id, a.main.c.name, a.access.c.user_id, subquery]).select_from(a.main.join(a.access)).where(a.access.c.user_id == 'myusername').group_by(a.main.c.id, a.main.c.name, a.access.c.user_id)
请告诉我这是否有帮助