模拟 python k8s 客户端

问题描述 投票:0回答:1

这是我的片段:

文件夹结构

utils/
├── __init__.py
├── k8s_client.py
├── simple.py
└── tests
    ├── k8s_client_test.py

k8s_client.py

import os
from kubernetes import client, config

config.load_kube_config(os.getenv('KUBECONFIG')) if os.getenv(
    'KUBECONFIG'
) else config.load_incluster_config()


class K8sClient:

    def __init__(self) -> None:
        self.k8s_api_client = client.CustomObjectsApi()

    def get_crs(self) -> list:
  
        custom_resources = self.k8s_api_client.list_cluster_custom_object(
            group=os.environ['GROUP'],
            version=os.environ['VERSION'],
            plural=os.environ['PLURAL'],
        )
        # list with all custom resources of certain type
        return custom_resources["items"]

样本.py

from utils.k8s_client import K8sClient


def initialize_config():

    return K8sClient().get_crs()

最后我尝试对

initialize_config
功能进行单元测试。这是我的测试代码:

from utils.simple import initialize_config
from mock import patch


@patch('utils.k8s_client.K8sClient.get_crs')
@patch('utils.k8s_client.config.load_incluster_config')
def test_sample(kube_config_mock, k8s_client_mock):
    custom_resources = initialize_config()

    assert k8s_client_mock.assert_called_once()

这是我得到的错误:

kubernetes.config.config_exception.ConfigException: Service host/port is not set.

这是因为根本不导出 kubeconfig,但我不想这样做,也不希望对 k8s 集群进行真正的调用以从那里获取真正的自定义资源。我只是想伪造这些调用和 kubeconfig 导出,在我的例子中看看

k8s_client_mock
是否被调用,如果我处理它,我可以向
k8s_client_mock
提供返回值以断言其长度等等。但主要思想是如何伪造这个 k8s 客户端并按照我的意愿管理它。

python unit-testing kubernetes
1个回答
0
投票

utils.k8s_client.py 中未定义“config.load_incluster_config”

应该是“kubernetes.config.load_incluster_config”

@patch('kubernetes.config.load_incluster_config')
@patch('xxxxxx')
def sample(kube_config_mock, xxxx):
    pass
© www.soinside.com 2019 - 2024. All rights reserved.