我正在使用存根来存根 stepfunction 的“get_execution_history”方法的响应,但它看起来也影响了 sqs 客户端。我已经使用 moto 库来模拟“sqs”服务。
我有我的 lambda_function.py
stepfunctionclient = boto3.client('stepfunctions', region_name='us-east-1')
sqsClient = boto3.client('sqs', region_name='us-east-1')
def lambda_handler(event, context):
# calling get_execution_history method
responseType = stepfunctionclient.get_execution_history(
executionArn="my_step_arn",
maxResults=1000,
reverseOrder=True,
)
# sending messages to sqs queue
sqsClient.send_message(QueueUrl="my_url", MessageBody="some_message")
然后我有我的 test_lambdaHandler.py
@mock_sqs
def test_lambdaHandler(
event,
sqsQueue,
dummyenvironmentVariables,
):
sqsQueue()
sfClient = boto3.client("stepfunctions", region_name="us-east-1")
stubber = botocore.stub.Stubber(sfClient)
execution_history = {
# mock response here
}
stubber.add_response(
'get_execution_history',
execution_history,
expected_params={
'executionArn': "my_step_arn",
'maxResults':1000,
'reverseOrder':True
}
)
stubber.activate()
with mock.patch.dict(os.environ, dummyenvironmentVariables, clear=True):
with mock.patch(boto3,'client', mock.MagicMock(return_value=sfClient)):
module = importlib.import_module(
"module_name"
).lambda_function
actualReturn = module.lambda_handler(event, None)
print(actualReturn)
我得到的错误是
AttributeError: 'SFN' object has no attribute 'send_message'
谁能帮我在不影响 sqs 客户端的情况下消除 get_execution_history 方法的响应。