我编写了一个lambda脚本来使用python和boto3管理Amazon Machine Images的生命周期。该脚本运行良好,但是当我意识到必须为其编写单元测试时,我的噩梦开始了。我不是开发人员,习惯于以SysAdmin的身份编写脚本。
我已经为具有返回状态的函数创建了单元测试,如下所示,并且我可以正常工作。
def get_interface_wrapper(region, service, interface_type):
interface_types = ['client', 'resource']
interface = None
if (type(region) == str) and (type(service) == str) and (type(interface_type) == str) and (interface_type in interface_types):
interface = ("boto3." + interface_type +
"(" + "service_name=service," + "region_name=region)")
return interface
def get_interface(region, service, interface_type):
return eval(get_interface_wrapper(region, service, interface_type))
#Unit tests
def test_get_interface_client(self):
service = 'ec2'
interface_expression = 'boto3.client(service_name=service,region_name=region)'
client_interface = get_interface_wrapper(
self.region, service, 'client')
self.assertEqual(client_interface, interface_expression)
def test_get_interface_resource(self):
service = 'ec2'
interface_expression = 'boto3.resource(service_name=service,region_name=region)'
resource_interface = get_interface_wrapper(
self.region, service, 'resource')
self.assertEqual(resource_interface, interface_expression)
但是,对于以下没有return语句且依赖AWS终结点的函数,我正在竭尽全力。如何模拟终端节点或如何更改代码以创建不依赖AWS终端节点的单元测试。
def update_states(actions, ec2_client, logs_client, log_group, log_stream, dryrun_enabled=True):
for action in actions:
action.update({'phase': 'planning', 'PlanningTime': datetime.utcnow(
).strftime('%Y-%m-%dT%H:%M:%S.%f')[:-3] + 'Z'})
put_log_events(logs_client, log_group, log_stream, [action])
# The tag packer_ami_state_tagging_date is not set
if (action['is_timestamp_present'] == True):
if (action['action'] == 'update'):
# The tag packer_ami_state_tagging_date is set, so update the state and tagging date
try:
ec2_client.Image(action['ImageId']).create_tags(DryRun=dryrun_enabled, Tags=[{'Key': 'packer_ami_state', 'Value': action['new_packer_ami_state']},
{'Key': 'packer_ami_state_tagging_date', 'Value': datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%S.%f')[:-3] + 'Z'}, ])
operation_result = [
{'phase': 'execution', 'imageid': action['ImageId'], 'ExecutionTime': datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%S.%f')[:-3] + 'Z', 'Result': 'AMI state and tagging date was updated'}, ]
except Exception as e:
operation_result = [
{'phase': 'execution', 'imageid': action['ImageId'], 'ExecutionTime': (datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%S.%f')[:-3] + 'Z'), 'Result': 'AMI state and tagging date was not updated', 'Error': e.args[0], }]
finally:
put_log_events(logs_client, log_group,
log_stream, operation_result)
if (action['action'] == 'delete'):
image = ec2_client.Image(action['ImageId'])
snapshots = []
for blockDevMapping in image.block_device_mappings:
if 'Ebs' in blockDevMapping:
snapshots.append(blockDevMapping['Ebs']['SnapshotId'])
try:
image.deregister(DryRun=dryrun_enabled)
operation_result = [
{'phase': 'execution', 'imageid': action['ImageId'], 'ExecutionTime': datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%S.%f')[:-3] + 'Z', 'Result': 'AMI was deregistered'}, ]
except Exception as e:
operation_result = [
{'phase': 'execution', 'imageid': action['ImageId'], 'ExecutionTime': (datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%S.%f')[:-3] + 'Z'), 'Result': 'AMI was not deregistered', 'Error': e.args[0], }]
finally:
put_log_events(logs_client, log_group,
log_stream, operation_result)
counter = 1
for snapshotID in snapshots:
snapshot = ec2_client.Snapshot(snapshotID)
try:
snapshot.delete(DryRun=dryrun_enabled)
operation_result = [
{'phase': 'execution', 'imageid': action['ImageId'], 'ExecutionTime': datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%S.%f')[:-3] + 'Z', 'Result': 'SnapShot deleted', 'SnapShotID': snapshotID}, ]
except Exception as e:
operation_result = [
{'phase': 'execution', 'imageid': action['ImageId'], 'ExecutionTime': (datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%S.%f')[:-3] + 'Z'), 'Result': 'SnapShot not deleted', 'Error': e.args[0], 'SnapShotID': snapshotID}, ]
finally:
put_log_events(logs_client, log_group,
log_stream, operation_result)
counter += 1
if (action['action'] == 'none'):
action.update(
{'OperationDate': datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%S.%f')[:-3] + 'Z', 'OperationResult': 'No action'})
operation_result = [
{'phase': 'execution', 'imageid': action['ImageId'], 'ExecutionTime': datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%S.%f')[:-3] + 'Z', 'Result': 'No action'}, ]
put_log_events(logs_client, log_group,
log_stream, operation_result)
else:
try:
ec2_client.Image(action['ImageId']).create_tags(DryRun=dryrun_enabled, Tags=[
{'Key': 'packer_ami_state_tagging_date', 'Value': datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%S.%f')[:-3] + 'Z'}, ])
operation_result = [
{'phase': 'execution', 'imageid': action['ImageId'], 'ExecutionTime': datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%S.%f')[:-3] + 'Z', 'Result': 'Tag created'}, ]
except Exception as e:
operation_result = [
{'phase': 'execution', 'imageid': action['ImageId'], 'ExecutionTime': (datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%S.%f')[:-3] + 'Z'), 'Result': 'Tag not created', 'Error': e.args[0], }]
finally:
put_log_events(logs_client, log_group,
log_stream, operation_result)
def put_log_events(client, log_group_name, log_stream_name, log_events):
log_stream = client.describe_log_streams(
logGroupName=log_group_name,
logStreamNamePrefix=log_stream_name
)
if (bool(log_stream['logStreams'])) and ('uploadSequenceToken' in log_stream['logStreams'][0]):
response = {
'nextSequenceToken': log_stream['logStreams'][0]['uploadSequenceToken']}
else:
response = {}
for log_event in log_events:
if bool(response):
response = client.put_log_events(
logGroupName=log_group_name,
logStreamName=log_stream_name,
logEvents=[
{
'timestamp': int(round(time.time() * 1000)),
'message': json.dumps(log_event)
},
],
sequenceToken=response['nextSequenceToken']
)
else:
response = client.put_log_events(
logGroupName=log_group_name,
logStreamName=log_stream_name,
logEvents=[
{
'timestamp': int(round(time.time() * 1000)),
'message': json.dumps(log_event)
},
],
)
我建议您在内置的unittest.mock library中使用修补程序。我用它来模拟所有的boto3调用,所以我从来没有打过真正的AWS服务。有很多选项,但这是一个模拟客户端的简单示例。
假设您在名为“ my_code”的模块中有代码,该模块导入boto3并调用get_parameters_by_path函数的“ ssm” boto3客户端。您可以使用如下代码来模拟:
@patch('api_lambda.api.my_code.boto3')
def test_secrets_load_ssm(self, mock_boto):
""" Normal Flow - source AWS SSM """
mock_client = MagicMock()
mock_boto.client.return_value = mock_client
mock_client.get_parameters_by_path.return_value = helper_function()
count = my_code.my_function_being_tested_that_fetches_a_parameter('TEST_APP', 'CI')
self.assertEqual(count, 1)
def helper_function():
return {'Parameters': [{'Name': '/TEST_APP/CI/secure_string_test',
'Type': 'SecureString',
'Value': 'secure string value',
'Version': 1,
'LastModifiedDate': datetime.datetime(2019, 8, 28, 14, 44, 26, 878000,
tzinfo=datetime.timezone.utc),
'ARN': 'arn:aws:ssm:us-east-1:072478573200:parameter/TEST_APP/CI/secure_string_test'}],
'ResponseMetadata': {'RequestId': 'b6f016a4-475d-40d2-a504-015d081d8603',
'HTTPStatusCode': 200,
'HTTPHeaders': {'x-amzn-requestid': 'b6f016a4-475d-40d2-a504-015d081d8603',
'content-type': 'application/x-amz-json-1.1',
'content-length': '666',
'date': 'Fri, 30 Aug 2019 16:57:17 GMT'},
'RetryAttempts': 0}
}
我将模型化的返回值放在单独的帮助器函数中,因为这不是本示例的重点,并且可以是您需要boto3进行原型化作为返回的任何JSON。如果您对单元测试模拟和补丁不熟悉,则必须稍微使用一下,但是我自己做完后,我可以证明它可以更优雅地解决此单元测试问题。 @patch批注可让您将真实的boto3库换成您创建的模型调用。