Monkeypatch 用于测试的多进程函数

问题描述 投票:0回答:1

Monkeypatching 使用多进程的函数(通过并发.futures.ProcessPoolExecutor)无法按预期工作。如果使用单进程或多线程(通过并发.futures.ThreadPoolExecutor)编写相同的函数,monkeypatch 将按预期工作。

为什么多进程 Monkeypatch 失败以及如何正确对多进程函数进行 Monkeypatch 进行测试?

下面是说明我的问题的最简单的代码示例。在实际使用中,我会尝试对从另一个模块导入的函数进行猴子补丁。

# file_a.py

import concurrent.futures as ccf

MY_CONSTANT = "hello"

def my_function():
    return MY_CONSTANT


def singleprocess_f():
    result = []
    for _ in range(3):
        result.append(my_function())
    return result


def multithread_f():
    result = []
    with ccf.ThreadPoolExecutor() as executor:
        futures = []
        for _ in range(3):
            future = executor.submit(my_function)
            futures.append(future)
        for future in ccf.as_completed(futures):
            result.append(future.result())
    return result


def multiprocess_f():
    result = []
    with ccf.ProcessPoolExecutor() as executor:
        futures = []
        for _ in range(3):
            future = executor.submit(my_function)
            futures.append(future)
        for future in ccf.as_completed(futures):
            result.append(future.result())
    return result

我希望所有测试都能通过:

# test_file_a.py

from file_a import multiprocess_f, multithread_f, singleprocess_f

# PASSES:
def test_singleprocess_f(monkeypatch):
    monkeypatch.setattr("file_a.MY_CONSTANT", "world")
    result = singleprocess_f()
    assert result == ["world"] * 3

# PASSES:
def test_multithread_f(monkeypatch):
    monkeypatch.setattr("file_a.MY_CONSTANT", "world")
    result = multithread_f()
    assert result == ["world"] * 3

# FAILS:
def test_multiprocess_f(monkeypatch):
    monkeypatch.setattr("file_a.MY_CONSTANT", "world")
    result = multiprocess_f()
    assert result == ["world"] * 3
python testing multiprocessing concurrent.futures
1个回答
0
投票

我已经对你的问题发表了评论,所以这里不再重复。我的猜测是,您正在某些默认使用 spawn 方法来创建新进程的平台上运行(例如 Windows)。如果是这种情况,在主进程中为

MY_CONSTANT
分配什么值并不重要,因为当创建子池进程时,它们将通过重新执行语句
MY_CONSTANT = "hello"
来初始化其内存。

我假设你的金钱修补的工作方式是这样的,当你测试一个需要修补某些东西的函数时,你希望将修补后的值恢复为其原始值(但从你的代码中并不清楚这正在被修复)完毕)。如果是这种情况,那么您必须在创建 ProcessPoolExecutor 实例时使用 initializer 参数提供

poolinitializer
函数。我不明白在这种情况下你如何在你想要测试的函数之外执行此操作,即在
test_multiprocess_f
中,这是不幸的。但是,如果您不需要恢复原始的修补值,或者您只是为使用 fork 方法创建新进程的平台(例如 Linux)进行编程,那么我将提出替代方案。

池初始化程序将指定在执行任何提交的任务之前在每个池进程中运行的函数。此初始化程序必须为每个子进程“修补”

MY_CONSTANT
(即使您使用 fork 方法创建新进程,这也将起作用)。

由于您尚未定义

monkeypatcher
是什么,所以我在下面创建了自己的猴子补丁。我还将您的两个文件合并为一个,以便可以复制和执行:

import concurrent.futures as ccf
from contextlib import contextmanager

MY_CONSTANT = "hello"

def do_patch():
    global MY_CONSTANT

    MY_CONSTANT = "world"

@contextmanager
def monkey_patcher():
    global MY_CONSTANT

    save_constant = MY_CONSTANT
    try:
        do_patch()
        yield None
    finally:
        MY_CONSTANT = save_constant

def init_pool_processes():
    # Do not use monkey_patcher because there is no need to restore the
    # original value of MY_CONSTANT and we would only want to restore the original value
    # when the patched value is no longer needed and there is no straightforwrd
    # way of knowing when to do this. The patched value will remain
    # until the child pool process terminates:
    do_patch()

def my_function():
    return MY_CONSTANT


def singleprocess_f():
    result = []
    for _ in range(3):
        result.append(my_function())
    return result


def multithread_f():
    result = []
    with ccf.ThreadPoolExecutor() as executor:
        futures = []
        for _ in range(3):
            future = executor.submit(my_function)
            futures.append(future)
        for future in ccf.as_completed(futures):
            result.append(future.result())
    return result


def multiprocess_f():
    result = []
    with ccf.ProcessPoolExecutor(initializer=init_pool_processes) as executor:
        futures = []
        for _ in range(3):
            future = executor.submit(my_function)
            futures.append(future)
        for future in ccf.as_completed(futures):
            result.append(future.result())
    return result

# PASSES:
def test_singleprocess_f():
    with monkey_patcher():
        result = singleprocess_f()
        assert result == ["world"] * 3

# PASSES:
def test_multithread_f():
    with monkey_patcher():
        result = multithread_f()
        assert result == ["world"] * 3

# PASSES:
def test_multiprocess_f():
    result = multiprocess_f()
    assert result == ["world"] * 3

if __name__ == '__main__':
    print('initial value =', repr(MY_CONSTANT))  # print value

    test_singleprocess_f()
    test_multithread_f()
    test_multiprocess_f()

    print('final value = ', repr(MY_CONSTANT))  # demonstrate that this value has been restored

打印:

initial value = 'hello'
final value =  'hello'

虽然上面的代码适用于使用 spawn (例如 Windows)或 fork (例如 Linux)创建子进程的平台,但如果您知道您正在使用 fork,那么您不需要池初始化程序函数,我们可以在

monkey_patcher
中使用
test_multiprocess_f
上下文管理器:

...


def multithread_f():
    result = []
    with ccf.ThreadPoolExecutor() as executor:
        futures = []
        for _ in range(3):
            future = executor.submit(my_function)
            futures.append(future)
        for future in ccf.as_completed(futures):
            result.append(future.result())
    return result

...


def test_multiprocess_f():
    with monkey_patcher():
        result = multiprocess_f()
        assert result == ["world"] * 3

如果您不需要将修补后的值恢复为原始值,则:

import concurrent.futures as ccf
from contextlib import contextmanager

MY_CONSTANT = "hello"


def my_function():
    return MY_CONSTANT


def singleprocess_f():
    result = []
    for _ in range(3):
        result.append(my_function())
    return result


def multithread_f():
    result = []
    with ccf.ThreadPoolExecutor() as executor:
        futures = []
        for _ in range(3):
            future = executor.submit(my_function)
            futures.append(future)
        for future in ccf.as_completed(futures):
            result.append(future.result())
    return result


def multiprocess_f():
    result = []
    with ccf.ProcessPoolExecutor() as executor:
        futures = []
        for _ in range(3):
            future = executor.submit(my_function)
            futures.append(future)
        for future in ccf.as_completed(futures):
            result.append(future.result())
    return result

# PASSES:
def test_singleprocess_f():
    result = singleprocess_f()
    assert result == ["world"] * 3

# PASSES:
def test_multithread_f():
    result = multithread_f()
    assert result == ["world"] * 3

# PASSES:
def test_multiprocess_f():
    result = multiprocess_f()
    assert result == ["world"] * 3


def do_patch():
    global MY_CONSTANT

    MY_CONSTANT = "world"


# Patch the value permanently before any tests are performed.
# If the spawn method is being used to create child processes, this will
# also patch these child processes:
do_patch()

if __name__ == '__main__':
    test_singleprocess_f()
    test_multithread_f()
    test_multiprocess_f()
© www.soinside.com 2019 - 2024. All rights reserved.