查询上的 SQLAlchemy 过滤器再次出现额外的垃圾行

问题描述 投票:0回答:1

我有 Django 经验。现在我想在我的迷你项目中尝试一些 DIY python 包。我这是我的代码。

models.py

import enum
from datetime import date, datetime

from sqlalchemy import UniqueConstraint
from sqlmodel import Field, SQLModel


class StatusEnum(enum.Enum):
    """Enum class of status field."""
    pending = "pending"
    in_progress = "in_progress"
    completed = "completed"


class TaskContent(SQLModel, table=True):
    """Model class for TaskContent history."""
    identifier: str = Field(primary_key=True)   # For redo mechanism
    id: int = Field(primary_key=False)  # For human use
    title: str = Field(nullable=True)
    description: str = Field(nullable=True)
    due_date: date = Field(default=None, nullable=True)
    status: StatusEnum = Field(default=StatusEnum.pending)
    is_deleted: bool = Field(default=False)  # For redo mechanism
    created_by: int = Field(nullable=True, default=None, foreign_key="user.id")
    created_at: datetime = Field(default=datetime.now())  # For redo mechanism


class CurrentTaskContent(SQLModel, table=True):
    """Model class for current."""
    # https://github.com/tiangolo/sqlmodel/issues/114
    __table_args__ = (UniqueConstraint("identifier", "id"), )

    identifier: str = Field(primary_key=True)   # For redo mechanism
    id: int = Field(primary_key=False)  # For human use
    created_by: int = Field(nullable=True, default=None, foreign_key="user.id")
    updated_by: int = Field(nullable=True, default=None, foreign_key="user.id")
    created_at: datetime = Field(default=datetime.now())
    updated_at: datetime = Field(default=datetime.now())


class User(SQLModel, table=True):
    """User model of this application."""
    id: int = Field(primary_key=True)
    username: str = Field(nullable=False)

main.py

def get_queryset() -> sqlalchemy.orm.query.Query:
    """Get the queryset of tasks."""
    with (Session(engine) as session):
        # List out available id.
        available_id_list = session.query(CurrentTaskContent).all()
        available_identifier_list = [i.identifier for i in available_id_list]

        # Get task and id
        queryset_tasks = session.query(TaskContent).filter(
            TaskContent.identifier.in_(available_identifier_list)
        ).subquery()

        # Get username and id
        username_query = session.query(User.username, User.id).subquery()

        # Join the queryset and username
        final_query = session.query(queryset_tasks, username_query).outerjoin(
            username_query, and_(queryset_tasks.c.created_by == username_query.c.id)
        )
        return final_query

tasks_queryset.count()
5
,这是正确的。我的数据库只有 5 行。 然后我尝试使用以下内容再次按
due_date
进行过滤。

问题:

aa
bb
都向我展示了
25
,这不是我需要的。

aa = tasks_queryset.filter(tasks_queryset.subquery().c.due_date == due_date_instance)
bb = tasks_queryset.filter(TaskContent.due_date == due_date_instance)

解决方案:

due_date
放入表查询中。

        queryset_tasks = session.query(TaskContent).filter(
            TaskContent.identifier.in_(available_identifier_list),
            or_(TaskContent.due_date == _due_date, _due_date is None)
        ).subquery()

问题:
如何使用给定的

tasks_queryset
过滤
due_date

python sqlalchemy fastapi
1个回答
0
投票

如果我正确地遵循了您的查询,您应该能够使用联接来简化查询并避免使用子查询。

with Session(engine) as session:
    final_query = session.query(TaskContent, User.username)\
        .join(
            User, TaskContent.created_by == User.id
        )\
        .join(
            CurrentTaskContent, CurrentTaskContent.identifier == TaskContent.identifier
        )\
        .filter(
            (CurrentTaskContent.identifier != None),
            or_(TaskContent.due_date == _due_date, _due_date is None)
        )
    return final_query
© www.soinside.com 2019 - 2024. All rights reserved.