有什么方法可以将测试分成独立的组,每个组都将运行新的 pytest 运行吗?

问题描述 投票:0回答:1

在我的框架中,在测试/设置/conftest之外有逻辑,其中创建一个从数据库检索数据的实例,然后在收集阶段形成要纳入 pytest.mark.parametrize 的测试数量。

我面临这样一个事实:某些情况下的数据量足够大,足以生成 500 万个测试。我尽最大努力优化了系统,但超出了docker 8GB的内存消耗限制;就我而言,在收集阶段高峰时的此类运行会在收集阶段消耗 11GB。

提出了将它们分成小组的想法;我可以将原始数据分解为位于本地的数据库片段。总的来说,就我而言,每次运行将包含 20 万次测试,这对我来说是完全可以接受的。

问题是我理想地希望在一次运行中组织代码,比方说 --batches args 我可以编写内部逻辑来为每个参数生成新的参数化,但我不知道如何拦截 pytest 以便在完成一组测试后,它会开始重新运行另一组测试。

我尝试了以下选项 - 一个单独的 python 脚本,使用某些参数(如 --batch_group 1 等)循环运行 pytest.main()

此方法不起作用,因为在这个 python 进程中,我的逻辑实例仅初始化一次,并且运行本质上并不新鲜。下面的例子:

class Logger:
    _instance = None

    def __new__(cls):
        if cls._instance is None:
            print("Creating new instance")
            cls._instance = super(Logger, cls).__new__(cls)
        return cls._instance

import pytest

def test_1():
    pytest.logger = Logger()
    print(f'{id(pytest.logger)=}')

def test_2():
    pytest.logger = Logger()
    print(f'{id(pytest.logger)=}')

if __name__ == "__main__":
    pytest.main(['-k', 'test_1'])
    pytest.main(['-k', 'test_2'])


id(pytest.logger) = 140325718144768
id(pytest.logger) = 140325718144768

也就是说,这里的运行不会是新鲜和独立的,尽管 pytest 使用不同的参数启动,但单例将工作并且实例不会根据新数据重新定义

收集后拆分测试的方法不合适,因为从内存消耗来看,每次运行将收集 500 万个测试并崩溃。也就是说,您需要在收集阶段之前分组

用 @pytest.mark 标记测试也不适合我,因为在我的情况下,一个标记可以收集多达 400 万个测试

我剩下的唯一方法是简单地使用 subprocess 或 bash 在循环中进行独立的 pytest 运行。但也许有人可以提出更优雅的方法?预先感谢

更新:我发现实现此目的的唯一方法是为每个 pytest 运行(批处理)启动单独的 subprocess.run,并实现逻辑以在之后合并结果

# run_batches.py

filtered_batches = sorted(filter(lambda _batch: case_name.lower() in _batch, os.listdir(json_batches_path)))
for batch in filtered_batches:
    subprocess.run(
        [
            "pytest",
            f"--single-report={report_folder}",
            f"--batch={batch}",
            "-q",
            "--tb=no",
            "--capture=no",
            "-p no:cacheprovider"
        ]
    )

# merge all reports
subprocess.run(["pytest", f"--merge-report={report_folder}"])
python pytest
1个回答
1
投票

我能提供什么

  1. 使用
    subprocess
    正如您提到的:
    您可以使用
    subprocess.run
    为每个批次生成一个新的 Python 进程,从而确保没有状态从一个批次转移到下一个批次。
import subprocess

num_batches = 25  # Total number of batches you want
for i in range(num_batches):
    subprocess.run(["pytest", "--batch_group", str(i)])
  1. 动态测试生成:
    一种更原生的 pytest 方法是动态生成测试。您可以在会话启动期间动态生成批次,然后仅生成当前批次的测试。这里的主要思想是利用 pytest hooks。
    这种方法的优点是您只收集和生成当前批处理组的测试,从而避免内存问题。
import pytest

def pytest_addoption(parser):
    parser.addoption("--batch_group", type=int, default=0, help="batch group number")

def pytest_generate_tests(metafunc):
    batch_group = metafunc.config.getoption("batch_group")
    start, end = compute_batch_limits(batch_group)
    
    # Fetch the subset of data for the current batch_group
    data_for_batch = fetch_data_from_db(start, end)
    
    if "db_data" in metafunc.fixturenames:
        metafunc.parametrize("db_data", data_for_batch)

def compute_batch_limits(batch_group):
    # Compute the start and end limits for the data fetch query
    # based on the batch_group number
    # ...

def fetch_data_from_db(start, end):
    # Fetch a subset of data based on the start and end limits
    # ...
  1. Pytest-xdist:
    如果您尚未使用它,请考虑使用 pytest-xdist,它允许并行测试执行。这可能有助于分配负载并加快执行速度,尽管您仍然需要将其与上述方法之一结合起来来解决内存限制。
  2. 考虑是否有办法优化或减少测试所需的数据量。
© www.soinside.com 2019 - 2024. All rights reserved.