在我的框架中,在测试/设置/conftest之外有逻辑,其中创建一个从数据库检索数据的实例,然后在收集阶段形成要纳入 pytest.mark.parametrize 的测试数量。
我面临这样一个事实:某些情况下的数据量足够大,足以生成 500 万个测试。我尽最大努力优化了系统,但超出了docker 8GB的内存消耗限制;就我而言,在收集阶段高峰时的此类运行会在收集阶段消耗 11GB。
提出了将它们分成小组的想法;我可以将原始数据分解为位于本地的数据库片段。总的来说,就我而言,每次运行将包含 20 万次测试,这对我来说是完全可以接受的。
问题是我理想地希望在一次运行中组织代码,比方说 --batches args 我可以编写内部逻辑来为每个参数生成新的参数化,但我不知道如何拦截 pytest 以便在完成一组测试后,它会开始重新运行另一组测试。
我尝试了以下选项 - 一个单独的 python 脚本,使用某些参数(如 --batch_group 1 等)循环运行 pytest.main()
此方法不起作用,因为在这个 python 进程中,我的逻辑实例仅初始化一次,并且运行本质上并不新鲜。下面的例子:
class Logger:
_instance = None
def __new__(cls):
if cls._instance is None:
print("Creating new instance")
cls._instance = super(Logger, cls).__new__(cls)
return cls._instance
import pytest
def test_1():
pytest.logger = Logger()
print(f'{id(pytest.logger)=}')
def test_2():
pytest.logger = Logger()
print(f'{id(pytest.logger)=}')
if __name__ == "__main__":
pytest.main(['-k', 'test_1'])
pytest.main(['-k', 'test_2'])
id(pytest.logger) = 140325718144768
id(pytest.logger) = 140325718144768
也就是说,这里的运行不会是新鲜和独立的,尽管 pytest 使用不同的参数启动,但单例将工作并且实例不会根据新数据重新定义
收集后拆分测试的方法不合适,因为从内存消耗来看,每次运行将收集 500 万个测试并崩溃。也就是说,您需要在收集阶段之前分组
用 @pytest.mark 标记测试也不适合我,因为在我的情况下,一个标记可以收集多达 400 万个测试
我剩下的唯一方法是简单地使用 subprocess 或 bash 在循环中进行独立的 pytest 运行。但也许有人可以提出更优雅的方法?预先感谢
更新:我发现实现此目的的唯一方法是为每个 pytest 运行(批处理)启动单独的 subprocess.run,并实现逻辑以在之后合并结果
# run_batches.py
filtered_batches = sorted(filter(lambda _batch: case_name.lower() in _batch, os.listdir(json_batches_path)))
for batch in filtered_batches:
subprocess.run(
[
"pytest",
f"--single-report={report_folder}",
f"--batch={batch}",
"-q",
"--tb=no",
"--capture=no",
"-p no:cacheprovider"
]
)
# merge all reports
subprocess.run(["pytest", f"--merge-report={report_folder}"])
我能提供什么
subprocess
正如您提到的:subprocess.run
为每个批次生成一个新的 Python 进程,从而确保没有状态从一个批次转移到下一个批次。import subprocess
num_batches = 25 # Total number of batches you want
for i in range(num_batches):
subprocess.run(["pytest", "--batch_group", str(i)])
import pytest
def pytest_addoption(parser):
parser.addoption("--batch_group", type=int, default=0, help="batch group number")
def pytest_generate_tests(metafunc):
batch_group = metafunc.config.getoption("batch_group")
start, end = compute_batch_limits(batch_group)
# Fetch the subset of data for the current batch_group
data_for_batch = fetch_data_from_db(start, end)
if "db_data" in metafunc.fixturenames:
metafunc.parametrize("db_data", data_for_batch)
def compute_batch_limits(batch_group):
# Compute the start and end limits for the data fetch query
# based on the batch_group number
# ...
def fetch_data_from_db(start, end):
# Fetch a subset of data based on the start and end limits
# ...