使用 Moto 测试 AWS 电子邮件 - 无法使其正常工作

问题描述 投票:0回答:1

我正在尝试使用 moto 来测试发送 aws 电子邮件。文档很差,有些东西已经改变了——比如 @mock_ses 似乎不再存在了。坦白说,我对嘲笑的理解很差。

这是正在测试的功能:

import logging
from typing import Iterable, Literal

import boto3.session

logger = logging.getLogger(__name__)

def send_email(boto3_session: boto3.session.Session, to: Iterable[str], subject: str, body: str, body_format: Literal['Html', 'Text'] = 'Html', cc: Iterable[str] = None, from_email: str = '[email protected]'):


    # Create the email message
    send_args = {
        'Source': from_email,
        'Destination': {
            'ToAddresses': to,
        },
        'Message': {
            'Subject': {'Data': subject},
            'Body': {body_format: {'Data': body}}
        }
    }
    if cc:
        send_args['Destination']['CcAddresses'] = cc

    response = boto3_session.client('ses', region_name="us-east-1").send_email(**send_args)
    message_id = response['MessageId']
    logger.debug(
        "Sent mail %s to %s.", message_id, to[0])

这是我的测试功能:

import os

import boto3
from moto import mock_aws
from moto.core import DEFAULT_ACCOUNT_ID
from moto.ses import ses_backends

import pytest
import pytest_check as ptc

import rbn_lib.comms as comms


# you need to have the secrets, key_id, and key in your environment  This tests against the live DB.

DEFAULT_REGION = "us-east-1"

@pytest.fixture
def aws_credentials():
    os.environ["AWS_ACCESS_KEY_ID"] = "testing"
    os.environ["AWS_SECRET_ACCESS_KEY"] = "testing"
    os.environ["AWS_SECURITY_TOKEN"] = "testing"
    os.environ["AWS_SESSION_TOKEN"] = "testing"
    os.environ["AWS_DEFAULT_REGION"] = DEFAULT_REGION


@mock_aws
@pytest.fixture
def sess(aws_credentials):
    sess = boto3.Session()
    yield sess


@mock_aws
def test_send_email(sess):
    to = ["test@test_email.us"]
    subject = "Test"
    body = "Test"
    
    comms.send_email(sess, to, subject, body)
    ses_backend = ses_backends[DEFAULT_ACCOUNT_ID][DEFAULT_REGION]

    ptc.equal(ses_backend.sent_messages[0].subject, subject)

我收到错误,表明 aws 实际上正在被调用:

self = <botocore.client.SES object at 0x000002542F110310>
operation_name = 'SendEmail'
api_params = {'Destination': {'ToAddresses': ['test@test_email.us']}, 'Message': {'Body': {'Html': {'Data': 'Test'}}, 'Subject': {'Data': 'Test'}}, 'Source': '[email protected]'}

    def _make_api_call(self, operation_name, api_params):
        operation_model = self._service_model.operation_model(operation_name)
        service_name = self._service_model.service_name
        history_recorder.record(
            'API_CALL',
            {
                'service': service_name,
                'operation': operation_name,
                'params': api_params,
            },
        )
        if operation_model.deprecated:
            logger.debug(
                'Warning: %s.%s() is deprecated', service_name, operation_name
            )
        request_context = {
            'client_region': self.meta.region_name,
            'client_config': self.meta.config,
            'has_streaming_input': operation_model.has_streaming_input,
            'auth_type': operation_model.auth_type,
        }
        api_params = self._emit_api_params(
            api_params=api_params,
            operation_model=operation_model,
            context=request_context,
        )
        (
            endpoint_url,
            additional_headers,
            properties,
        ) = self._resolve_endpoint_ruleset(
            operation_model, api_params, request_context
        )
        if properties:
            # Pass arbitrary endpoint info with the Request
            # for use during construction.
            request_context['endpoint_properties'] = properties
        request_dict = self._convert_to_request_dict(
            api_params=api_params,
            operation_model=operation_model,
            endpoint_url=endpoint_url,
            context=request_context,
            headers=additional_headers,
        )
        resolve_checksum_context(request_dict, operation_model, api_params)
    
        service_id = self._service_model.service_id.hyphenize()
        handler, event_response = self.meta.events.emit_until_response(
            'before-call.{service_id}.{operation_name}'.format(
                service_id=service_id, operation_name=operation_name
            ),
            model=operation_model,
            params=request_dict,
            request_signer=self._request_signer,
            context=request_context,
        )
    
        if event_response is not None:
            http, parsed_response = event_response
        else:
            maybe_compress_request(
                self.meta.config, request_dict, operation_model
            )
            apply_request_checksum(request_dict)
            http, parsed_response = self._make_request(
                operation_model, request_dict, request_context
            )
    
        self.meta.events.emit(
            'after-call.{service_id}.{operation_name}'.format(
                service_id=service_id, operation_name=operation_name
            ),
            http_response=http,
            parsed=parsed_response,
            model=operation_model,
            context=request_context,
        )
    
        if http.status_code >= 300:
            error_info = parsed_response.get("Error", {})
            error_code = error_info.get("QueryErrorCode") or error_info.get(
                "Code"
            )
            error_class = self.exceptions.from_code(error_code)
>           raise error_class(parsed_response, operation_name)
E           botocore.errorfactory.MessageRejected: An error occurred (MessageRejected) when calling the SendEmail operation: Email address not verified [email protected]
python email pytest boto3 moto
1个回答
0
投票

在发送电子邮件之前,必须验证电子邮件地址。这就是它在 AWS 中的工作原理,Moto 也表现出相同的行为。

您必须首先使用

verify_email_identity
功能:

@mock_aws
def test_send_email(sess):
    to = ["test@test_email.us"]
    subject = "Test"
    body = "Test"

    sess.client('ses', region_name="us-east-1").verify_email_identity(EmailAddress="[email protected]")
    send_email(sess, to, subject, body)
    ses_backend = ses_backends[DEFAULT_ACCOUNT_ID][DEFAULT_REGION]

    ptc.equal(ses_backend.sent_messages[0].subject, subject)
© www.soinside.com 2019 - 2024. All rights reserved.