我正在开发一个 Flask+SQLAlchemy 项目,用于跟踪 API 测试结果。
我在sql alchemy中设置了3张表,1:M:M关系来跟踪API测试的测试结果。我正在跟踪整个 API
Endpoint
、该 API (TestCase
) 的功能以及各个 TestResult
的功能。
模型设置如下:
class Endpoint(Base):
__tablename__ = "endpoint"
# Primary Key
id: Mapped[int] = mapped_column(primary_key=True)
# Data
api_route: Mapped[str] = mapped_column(String(300), unique=True, nullable=False)
# Relationships
## Links TestCase.endpoint
test_cases: Mapped[List['TestCase']] = relationship(back_populates="endpoint")
@property
def serialize(self):
return {
'id': self.id,
'api_route': self.api_route,
'test_cases': [i.serialize for i in self.test_cases],
'health': self.health
}
class TestCase(Base):
__tablename__ = "test_case"
# Primary Key
id: Mapped[int] = mapped_column(primary_key=True)
# Foreign Keys
endpoint_fk: Mapped[int] = mapped_column(ForeignKey("endpoint.id"))
# Relationships
## Links Endpoint.test_cases
endpoint: Mapped['Endpoint'] = relationship(back_populates="test_cases")
## Links TestResult.test_case
results: Mapped[List['TestResult']] = relationship(back_populates="test_case")
# Data
test_case: Mapped[str] = mapped_column(String(50))
@property
def serialize(self):
return {
'id': self.id,
'endpoint_fk': self.endpoint_fk,
'test_case': self.test_case,
'results': [i.serialize for i in self.results],
'health': self.health,
}
class TestResult(Base):
__tablename__ = "testresult"
# Primary Key
id: Mapped[int] = mapped_column(primary_key=True)
# Foreign Keys
test_case_fk: Mapped[int] = mapped_column(ForeignKey("test_case.id"))
# Data
result: Mapped[int] = mapped_column(Integer())
timestamp: Mapped[DateTime] = mapped_column(DateTime(timezone=True), server_default=func.now())
error: Mapped[str] = mapped_column(String(3000))
# Relationships
# Links TestCase.results
test_case: Mapped['TestCase'] = relationship(back_populates="results")
@property
def serialize(self):
return {
'id': self.id,
'test_case_fk': self.test_case_fk,
'result': self.result,
'timestamp': self.timestamp,
'error': self.error
}
我现在正在开发一个显示过去 24 小时结果的仪表板,并且根据开始和结束日期过滤我的查询似乎相当简单。我像这样查询表以返回
Endpoint
对象,然后 serialize
通过定义的关系获取所有子数据。我希望返回我的 Endpoint 对象,因为那里有一些附加属性,这些属性是根据子数据计算的,而无需处理查询中的聚合。
query = db.session.query(Endpoint) \
.join(TestCase, Endpoint.id == TestCase.endpoint_fk) \
.join(TestResult, TestCase.id == TestResult.test_case_fk) \
.filter(TestResult.timestamp.between(one_day_ago, dt_now)) \
.order_by(Endpoint.id)
logger.info(query)
res = [r.serialize for r in query.all()]
执行时,它会返回所有测试结果的所有数据,而不是仅限于
one_day_ago
和 dt_now
之间的数据。
老实说,我有点不明白为什么我会得到所有这些额外的数据,其中
TestResult
记录属于指定的日期范围之外。
我尝试只做一些原始 SQL 和类似的工作,并正确返回适当数量的行。
SELECT *
FROM endpoint JOIN test_case ON endpoint.id = test_case.endpoint_fk JOIN testresult ON test_case.id = testresult.test_case_fk
WHERE testresult.timestamp BETWEEN '2023-08-28T00:00:00.112993-04:00' AND '2023-08-29T23:59:59.112993-04:00'
ORDER BY endpoint.id
我打开了数据库回显,看起来延迟加载正在执行其应该执行的操作,但在获取子记录时原始 WHERE 子句并未被保留。
2023-08-29 13:27:26,992 INFO sqlalchemy.engine.Engine
SELECT endpoint.id AS endpoint_id, endpoint.api_route AS endpoint_api_route
FROM endpoint JOIN test_case ON endpoint.id = test_case.endpoint_fk JOIN testresult ON test_case.id = testresult.test_case_fk
WHERE testresult.timestamp BETWEEN %(timestamp_1)s AND %(timestamp_2)s ORDER BY endpoint.id
2023-08-29 13:27:26,992 INFO sqlalchemy.engine.Engine [generated in 0.00015s] {'timestamp_1': datetime.datetime(2023, 8, 28, 13, 27, 26, 978617), 'timestamp_2': datetime.datetime(2023, 8, 29, 13, 27, 26, 978617)}
2023-08-29 13:27:26,998 INFO sqlalchemy.engine.Engine
SELECT test_case.endpoint_fk AS test_case_endpoint_fk, test_case.id AS test_case_id, test_case.test_case AS test_case_test_case
FROM test_case
WHERE test_case.endpoint_fk IN (%(primary_keys_1)s, %(primary_keys_2)s)
2023-08-29 13:27:26,998 INFO sqlalchemy.engine.Engine [generated in 0.00028s] {'primary_keys_1': 1, 'primary_keys_2': 2}
2023-08-29 13:27:27,000 INFO sqlalchemy.engine.Engine
SELECT testresult.test_case_fk AS testresult_test_case_fk, testresult.id AS testresult_id, testresult.result AS testresult_result, testresult.timestamp AS testresult_timestamp, testresult.error AS testresult_error
FROM testresult
WHERE testresult.test_case_fk IN (%(primary_keys_1)s, %(primary_keys_2)s, %(primary_keys_3)s, %(primary_keys_4)s, %(primary_keys_5)s, %(primary_keys_6)s, %(primary_keys_7)s, %(primary_keys_8)s, %(primary_keys_9)s, %(primary_keys_10)s, %(primary_keys_11)s, %(primary_keys_12)s, %(primary_keys_13)s, %(primary_keys_14)s, %(primary_keys_15)s, %(primary_keys_16)s, %(primary_keys_17)s, %(primary_keys_18)s, %(primary_keys_19)s, %(primary_keys_20)s, %(primary_keys_21)s, %(primary_keys_22)s)
2023-08-29 13:27:27,000 INFO sqlalchemy.engine.Engine [generated in 0.00017s] {'primary_keys_1': 1, 'primary_keys_2': 2, 'primary_keys_3': 3, 'primary_keys_4': 4, 'primary_keys_5': 5, 'primary_keys_6': 6, 'primary_keys_7': 7, 'primary_keys_8': 8, 'primary_keys_9': 9, 'primary_keys_10': 10, 'primary_keys_11': 11, 'primary_keys_12': 12, 'primary_keys_13': 13, 'primary_keys_14': 14, 'primary_keys_15': 15, 'primary_keys_16': 16, 'primary_keys_17': 17, 'primary_keys_18': 18, 'primary_keys_19': 19, 'primary_keys_20': 20, 'primary_keys_21': 21, 'primary_keys_22': 22}
对于如何限制返回的数据并且不获取所有这些额外记录,有什么想法/建议/帮助吗?我曾想过也许可以使用子查询进行查询以获取日期范围内的
TestResult
,然后加入到该结果集,但尚未弄清楚其语法。
因为我有点怀疑延迟加载是我的罪魁祸首,一旦选择了
Endpoint
记录,即使我指定了过滤器/where 子句,它也只是获取与该键匹配的所有子记录。
在阅读了更多文档之后,我找到了使延迟加载符合我想要的条件所必需的东西。
文档:https://docs.sqlalchemy.org/en/20/orm/queryguide/api.html#sqlalchemy.orm.with_loader_criteria
通过关系查询并获取所有数据的最终结果
query = db.select(Endpoint) \
.options(
with_loader_criteria(TestResult, TestResult.timestamp.between(the_day_before, start_date))
).order_by(Endpoint.id)