SQLAlchemy 根据日期范围查询子记录

问题描述 投票:0回答:1

我正在开发一个 Flask+SQLAlchemy 项目,用于跟踪 API 测试结果。

我在sql alchemy中设置了3张表,1:M:M关系来跟踪API测试的测试结果。我正在跟踪整个 API

Endpoint
、该 API (
TestCase
) 的功能以及各个
TestResult
的功能。

模型设置如下:

class Endpoint(Base):
    __tablename__ = "endpoint"
    # Primary Key
    id: Mapped[int] = mapped_column(primary_key=True)
    # Data
    api_route: Mapped[str] = mapped_column(String(300), unique=True, nullable=False)
    # Relationships
    ## Links TestCase.endpoint
    test_cases: Mapped[List['TestCase']] = relationship(back_populates="endpoint")

    @property
    def serialize(self):
        return {
            'id': self.id,
            'api_route': self.api_route,
            'test_cases': [i.serialize for i in self.test_cases],
            'health': self.health
        }
class TestCase(Base):
    __tablename__ = "test_case"
    # Primary Key
    id: Mapped[int] = mapped_column(primary_key=True)
    # Foreign Keys
    endpoint_fk: Mapped[int] = mapped_column(ForeignKey("endpoint.id"))
    # Relationships
    ## Links Endpoint.test_cases
    endpoint: Mapped['Endpoint'] = relationship(back_populates="test_cases")
    ## Links TestResult.test_case
    results: Mapped[List['TestResult']] = relationship(back_populates="test_case")
    # Data
    test_case: Mapped[str] = mapped_column(String(50))

    @property
    def serialize(self):
        return {
            'id': self.id,
            'endpoint_fk': self.endpoint_fk,
            'test_case': self.test_case,
            'results': [i.serialize for i in self.results],
            'health': self.health,
        }
class TestResult(Base):
    __tablename__ = "testresult"
    # Primary Key
    id: Mapped[int] = mapped_column(primary_key=True)
    # Foreign Keys
    test_case_fk: Mapped[int] = mapped_column(ForeignKey("test_case.id"))
    # Data
    result: Mapped[int] = mapped_column(Integer())
    timestamp: Mapped[DateTime] = mapped_column(DateTime(timezone=True), server_default=func.now())
    error: Mapped[str] = mapped_column(String(3000))
    # Relationships
    # Links TestCase.results
    test_case: Mapped['TestCase'] = relationship(back_populates="results")

    @property
    def serialize(self):
        return {
            'id': self.id,
            'test_case_fk': self.test_case_fk,
            'result': self.result,
            'timestamp': self.timestamp,
            'error': self.error
        }

我现在正在开发一个显示过去 24 小时结果的仪表板,并且根据开始和结束日期过滤我的查询似乎相当简单。我像这样查询表以返回

Endpoint
对象,然后
serialize
通过定义的关系获取所有子数据。我希望返回我的 Endpoint 对象,因为那里有一些附加属性,这些属性是根据子数据计算的,而无需处理查询中的聚合。

query = db.session.query(Endpoint) \
    .join(TestCase, Endpoint.id == TestCase.endpoint_fk) \
    .join(TestResult, TestCase.id == TestResult.test_case_fk) \
    .filter(TestResult.timestamp.between(one_day_ago, dt_now)) \
    .order_by(Endpoint.id)
logger.info(query)

res = [r.serialize for r in query.all()]

执行时,它会返回所有测试结果的所有数据,而不是仅限于

one_day_ago
dt_now
之间的数据。

老实说,我有点不明白为什么我会得到所有这些额外的数据,其中

TestResult
记录属于指定的日期范围之外。

我尝试只做一些原始 SQL 和类似的工作,并正确返回适当数量的行。

SELECT *
FROM endpoint JOIN test_case ON endpoint.id = test_case.endpoint_fk JOIN testresult ON test_case.id = testresult.test_case_fk 
WHERE testresult.timestamp BETWEEN '2023-08-28T00:00:00.112993-04:00' AND '2023-08-29T23:59:59.112993-04:00'
ORDER BY endpoint.id

我打开了数据库回显,看起来延迟加载正在执行其应该执行的操作,但在获取子记录时原始 WHERE 子句并未被保留。

2023-08-29 13:27:26,992 INFO sqlalchemy.engine.Engine
SELECT endpoint.id AS endpoint_id, endpoint.api_route AS endpoint_api_route 
FROM endpoint JOIN test_case ON endpoint.id = test_case.endpoint_fk JOIN testresult ON test_case.id = testresult.test_case_fk 
WHERE testresult.timestamp BETWEEN %(timestamp_1)s AND %(timestamp_2)s ORDER BY endpoint.id
2023-08-29 13:27:26,992 INFO sqlalchemy.engine.Engine [generated in 0.00015s] {'timestamp_1': datetime.datetime(2023, 8, 28, 13, 27, 26, 978617), 'timestamp_2': datetime.datetime(2023, 8, 29, 13, 27, 26, 978617)}

2023-08-29 13:27:26,998 INFO sqlalchemy.engine.Engine
SELECT test_case.endpoint_fk AS test_case_endpoint_fk, test_case.id AS test_case_id, test_case.test_case AS test_case_test_case 
FROM test_case 
WHERE test_case.endpoint_fk IN (%(primary_keys_1)s, %(primary_keys_2)s)
2023-08-29 13:27:26,998 INFO sqlalchemy.engine.Engine [generated in 0.00028s] {'primary_keys_1': 1, 'primary_keys_2': 2}

2023-08-29 13:27:27,000 INFO sqlalchemy.engine.Engine
SELECT testresult.test_case_fk AS testresult_test_case_fk, testresult.id AS testresult_id, testresult.result AS testresult_result, testresult.timestamp AS testresult_timestamp, testresult.error AS testresult_error 
FROM testresult 
WHERE testresult.test_case_fk IN (%(primary_keys_1)s, %(primary_keys_2)s, %(primary_keys_3)s, %(primary_keys_4)s, %(primary_keys_5)s, %(primary_keys_6)s, %(primary_keys_7)s, %(primary_keys_8)s, %(primary_keys_9)s, %(primary_keys_10)s, %(primary_keys_11)s, %(primary_keys_12)s, %(primary_keys_13)s, %(primary_keys_14)s, %(primary_keys_15)s, %(primary_keys_16)s, %(primary_keys_17)s, %(primary_keys_18)s, %(primary_keys_19)s, %(primary_keys_20)s, %(primary_keys_21)s, %(primary_keys_22)s)
2023-08-29 13:27:27,000 INFO sqlalchemy.engine.Engine [generated in 0.00017s] {'primary_keys_1': 1, 'primary_keys_2': 2, 'primary_keys_3': 3, 'primary_keys_4': 4, 'primary_keys_5': 5, 'primary_keys_6': 6, 'primary_keys_7': 7, 'primary_keys_8': 8, 'primary_keys_9': 9, 'primary_keys_10': 10, 'primary_keys_11': 11, 'primary_keys_12': 12, 'primary_keys_13': 13, 'primary_keys_14': 14, 'primary_keys_15': 15, 'primary_keys_16': 16, 'primary_keys_17': 17, 'primary_keys_18': 18, 'primary_keys_19': 19, 'primary_keys_20': 20, 'primary_keys_21': 21, 'primary_keys_22': 22}

对于如何限制返回的数据并且不获取所有这些额外记录,有什么想法/建议/帮助吗?我曾想过也许可以使用子查询进行查询以获取日期范围内的

TestResult
,然后加入到该结果集,但尚未弄清楚其语法。

python flask sqlalchemy flask-sqlalchemy
1个回答
0
投票

因为我有点怀疑延迟加载是我的罪魁祸首,一旦选择了

Endpoint
记录,即使我指定了过滤器/where 子句,它也只是获取与该键匹配的所有子记录。

在阅读了更多文档之后,我找到了使延迟加载符合我想要的条件所必需的东西。

文档:https://docs.sqlalchemy.org/en/20/orm/queryguide/api.html#sqlalchemy.orm.with_loader_criteria

通过关系查询并获取所有数据的最终结果

query = db.select(Endpoint) \
        .options(
            with_loader_criteria(TestResult, TestResult.timestamp.between(the_day_before, start_date))
        ).order_by(Endpoint.id)
© www.soinside.com 2019 - 2024. All rights reserved.