如何将原始查询转换为 sqlalchemy,用于外部查询

问题描述 投票:0回答:1

我正在尝试将这个原始 PostgreSQL 查询转换为 SQLAlchemy:

SELECT a.main.id, a.main.name, a.access.user_id
(select count(*) from a.access where a.access.main_id = a.main.id and a.access.is_admin IS FALSE) as count
FROM a.main
INNER JOIN a.access ON a.access.main_id = a.main.id
where a.access.user_id = 'myusername' 
group by a.main.id, a.main.name, a.access.user_id

所以我在 SQLAlchemy 中所做的就是这个

        subquery = session.query(
            func.count().label("count")
        ).filter(
            Access.main_id == Main.id,
            Access.is_admin.is_(False),
        ).subquery()

        stmt = session.query(
            Main.id,
            Main.name
            Access.user_id
            subquery.c.count
        ).join(
            Access,
            Access.main_id = Main.id
        ).filter(
            Access.user_id = 'myusername'
        ).group_by(
            Main.id,
            Main.name,
            Access.user_id,
            subquery.c.count
        ).all()

但是在检查

subquery
时,它显示查询为:

SELECT count(*)
FROM a.access, a.main
WHERE a.access.main_id = a.main.id

这就是我陷入困境的地方。我尝试查看这个question,但这仅处理子查询位于 where 子句中的情况。

sqlalchemy
1个回答
0
投票

你的问题如何将原始sql转换为sqlalchemy显然太宽泛了,但我认为这会带来一个你可以使用或尝试的解决方案:

main_alias = aliased(Main)
access_alias = aliased(Access)
query = (
    session.query(
        Main.id,
        Main.name,
        Access.user_id,
        session.query(func.count())
        .filter(access_alias.main_id == main_alias.id, access_alias.is_admin.is_(False))
        .correlate(main_alias)
        .as_scalar()
        .label("count"),
    )
    .join(access_alias, access_alias.main_id == Main.id)
    .filter(Access.user_id == "myusername")
    .group_by(Main.id, Main.name, Access.user_id)
)

# Execute the query and retrieve the results
results = query.all()

这里的想法是,它不使用显式子查询,而是使用相同的表作为别名,它连接并关联到该表,然后使用适当的过滤器选择计数(这将 where 子句放在它所属的位置)

我还认为按 subquery.c.count 分组是您原始查询中的错误?

还有一个使用 SQLAlchemy Core 的示例,它可能更接近您显式使用子查询的想法,但语法显然非常不同:

from sqlalchemy import select, func, and_

subquery = select([func.count()]).where(and_(a.access.c.main_id == a.main.c.id, a.access.c.is_admin == False)).label('count')
stmt = select([a.main.c.id, a.main.c.name, a.access.c.user_id, subquery]).select_from(a.main.join(a.access)).where(a.access.c.user_id == 'myusername').group_by(a.main.c.id, a.main.c.name, a.access.c.user_id)

请告诉我这是否有帮助

© www.soinside.com 2019 - 2024. All rights reserved.