单元测试lambda脚本

问题描述 投票:3回答:1

我编写了一个lambda脚本来使用python和boto3管理Amazon Machine Images的生命周期。该脚本运行良好,但是当我意识到必须为其编写单元测试时,我的噩梦开始了。我不是开发人员,习惯于以SysAdmin的身份编写脚本。

我已经为具有返回状态的函数创建了单元测试,如下所示,并且我可以正常工作。

def get_interface_wrapper(region, service, interface_type):
    interface_types = ['client', 'resource']
    interface = None

    if (type(region) == str) and (type(service) == str) and (type(interface_type) == str) and (interface_type in interface_types):
        interface = ("boto3." + interface_type +
                     "(" + "service_name=service," + "region_name=region)")

    return interface

def get_interface(region, service, interface_type):
    return eval(get_interface_wrapper(region, service, interface_type))

#Unit tests
def test_get_interface_client(self):

    service = 'ec2'
    interface_expression = 'boto3.client(service_name=service,region_name=region)'
    client_interface = get_interface_wrapper(
        self.region, service, 'client')
    self.assertEqual(client_interface, interface_expression)


def test_get_interface_resource(self):

    service = 'ec2'
    interface_expression = 'boto3.resource(service_name=service,region_name=region)'
    resource_interface = get_interface_wrapper(
        self.region, service, 'resource')
    self.assertEqual(resource_interface, interface_expression)

但是,对于以下没有return语句且依赖AWS终结点的函数,我正在竭尽全力。如何模拟终端节点或如何更改代码以创建不依赖AWS终端节点的单元测试。

def update_states(actions, ec2_client, logs_client, log_group, log_stream, dryrun_enabled=True):
    for action in actions:

        action.update({'phase': 'planning', 'PlanningTime': datetime.utcnow(
        ).strftime('%Y-%m-%dT%H:%M:%S.%f')[:-3] + 'Z'})
        put_log_events(logs_client, log_group, log_stream, [action])

        # The tag packer_ami_state_tagging_date is not set
        if (action['is_timestamp_present'] == True):

            if (action['action'] == 'update'):
                # The tag packer_ami_state_tagging_date is set, so update the state and tagging date
                try:

                    ec2_client.Image(action['ImageId']).create_tags(DryRun=dryrun_enabled, Tags=[{'Key': 'packer_ami_state', 'Value': action['new_packer_ami_state']},
                                                                                                 {'Key': 'packer_ami_state_tagging_date', 'Value': datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%S.%f')[:-3] + 'Z'}, ])

                    operation_result = [
                        {'phase': 'execution', 'imageid': action['ImageId'], 'ExecutionTime':  datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%S.%f')[:-3] + 'Z', 'Result': 'AMI state and tagging date was updated'}, ]

                except Exception as e:

                    operation_result = [
                        {'phase': 'execution', 'imageid': action['ImageId'], 'ExecutionTime': (datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%S.%f')[:-3] + 'Z'), 'Result': 'AMI state and tagging date was not updated', 'Error': e.args[0], }]

                finally:
                    put_log_events(logs_client, log_group,
                                   log_stream, operation_result)

            if (action['action'] == 'delete'):
                image = ec2_client.Image(action['ImageId'])
                snapshots = []
                for blockDevMapping in image.block_device_mappings:
                    if 'Ebs' in blockDevMapping:
                        snapshots.append(blockDevMapping['Ebs']['SnapshotId'])

                try:
                    image.deregister(DryRun=dryrun_enabled)
                    operation_result = [
                        {'phase': 'execution', 'imageid': action['ImageId'], 'ExecutionTime':  datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%S.%f')[:-3] + 'Z', 'Result': 'AMI was deregistered'}, ]

                except Exception as e:
                    operation_result = [
                        {'phase': 'execution', 'imageid': action['ImageId'], 'ExecutionTime': (datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%S.%f')[:-3] + 'Z'), 'Result': 'AMI was not deregistered', 'Error': e.args[0], }]

                finally:
                    put_log_events(logs_client, log_group,
                                   log_stream, operation_result)

                counter = 1
                for snapshotID in snapshots:
                    snapshot = ec2_client.Snapshot(snapshotID)

                    try:
                        snapshot.delete(DryRun=dryrun_enabled)
                        operation_result = [
                            {'phase': 'execution', 'imageid': action['ImageId'], 'ExecutionTime':  datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%S.%f')[:-3] + 'Z', 'Result': 'SnapShot deleted', 'SnapShotID': snapshotID}, ]

                    except Exception as e:
                        operation_result = [
                            {'phase': 'execution', 'imageid': action['ImageId'], 'ExecutionTime': (datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%S.%f')[:-3] + 'Z'), 'Result': 'SnapShot not deleted', 'Error': e.args[0], 'SnapShotID': snapshotID}, ]

                    finally:
                        put_log_events(logs_client, log_group,
                                       log_stream, operation_result)

                    counter += 1

            if (action['action'] == 'none'):
                action.update(
                    {'OperationDate': datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%S.%f')[:-3] + 'Z', 'OperationResult': 'No action'})

                operation_result = [
                    {'phase': 'execution', 'imageid': action['ImageId'], 'ExecutionTime':  datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%S.%f')[:-3] + 'Z', 'Result': 'No action'}, ]

                put_log_events(logs_client, log_group,
                               log_stream, operation_result)

        else:
            try:
                ec2_client.Image(action['ImageId']).create_tags(DryRun=dryrun_enabled, Tags=[
                    {'Key': 'packer_ami_state_tagging_date', 'Value': datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%S.%f')[:-3] + 'Z'}, ])

                operation_result = [
                    {'phase': 'execution', 'imageid': action['ImageId'], 'ExecutionTime':  datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%S.%f')[:-3] + 'Z', 'Result': 'Tag created'}, ]

            except Exception as e:
                operation_result = [
                    {'phase': 'execution', 'imageid': action['ImageId'], 'ExecutionTime': (datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%S.%f')[:-3] + 'Z'), 'Result': 'Tag not created', 'Error': e.args[0], }]

            finally:
                put_log_events(logs_client, log_group,
                               log_stream, operation_result)


def put_log_events(client, log_group_name, log_stream_name, log_events):
    log_stream = client.describe_log_streams(
        logGroupName=log_group_name,
        logStreamNamePrefix=log_stream_name
    )

    if (bool(log_stream['logStreams'])) and ('uploadSequenceToken' in log_stream['logStreams'][0]):
        response = {
            'nextSequenceToken': log_stream['logStreams'][0]['uploadSequenceToken']}
    else:
        response = {}

    for log_event in log_events:
        if bool(response):
            response = client.put_log_events(
                logGroupName=log_group_name,
                logStreamName=log_stream_name,
                logEvents=[
                    {
                        'timestamp': int(round(time.time() * 1000)),
                        'message': json.dumps(log_event)
                    },
                ],
                sequenceToken=response['nextSequenceToken']
            )
        else:
            response = client.put_log_events(
                logGroupName=log_group_name,
                logStreamName=log_stream_name,
                logEvents=[
                    {
                        'timestamp': int(round(time.time() * 1000)),
                        'message': json.dumps(log_event)
                    },
                ],
            )
python amazon-web-services unit-testing boto3
1个回答
0
投票

我建议您在内置的unittest.mock library中使用修补程序。我用它来模拟所有的boto3调用,所以我从来没有打过真正的AWS服务。有很多选项,但这是一个模拟客户端的简单示例。

假设您在名为“ my_code”的模块中有代码,该模块导入boto3并调用get_parameters_by_path函数的“ ssm” boto3客户端。您可以使用如下代码来模拟:

@patch('api_lambda.api.my_code.boto3')
def test_secrets_load_ssm(self, mock_boto):
    """ Normal Flow - source AWS SSM """
    mock_client = MagicMock()

    mock_boto.client.return_value = mock_client
    mock_client.get_parameters_by_path.return_value = helper_function()

    count = my_code.my_function_being_tested_that_fetches_a_parameter('TEST_APP', 'CI')
    self.assertEqual(count, 1)


def helper_function():
    return {'Parameters': [{'Name': '/TEST_APP/CI/secure_string_test',
                            'Type': 'SecureString',
                            'Value': 'secure string value',
                            'Version': 1,
                            'LastModifiedDate': datetime.datetime(2019, 8, 28, 14, 44, 26, 878000,
                                                                  tzinfo=datetime.timezone.utc),
                            'ARN': 'arn:aws:ssm:us-east-1:072478573200:parameter/TEST_APP/CI/secure_string_test'}],
            'ResponseMetadata': {'RequestId': 'b6f016a4-475d-40d2-a504-015d081d8603',
                                 'HTTPStatusCode': 200,
                                 'HTTPHeaders': {'x-amzn-requestid': 'b6f016a4-475d-40d2-a504-015d081d8603',
                                                 'content-type': 'application/x-amz-json-1.1',
                                                 'content-length': '666',
                                                 'date': 'Fri, 30 Aug 2019 16:57:17 GMT'},
                                 'RetryAttempts': 0}
            }

我将模型化的返回值放在单独的帮助器函数中,因为这不是本示例的重点,并且可以是您需要boto3进行原型化作为返回的任何JSON。如果您对单元测试模拟和补丁不熟悉,则必须稍微使用一下,但是我自己做完后,我可以证明它可以更优雅地解决此单元测试问题。 @patch批注可让您将真实的boto3库换成您创建的模型调用。

© www.soinside.com 2019 - 2024. All rights reserved.