如何使用aiobotocore模拟AWS S3

问题描述 投票:0回答:7

我有一个使用 aiohttp 和 aiobotocore 来处理 AWS 中的资源的项目。我正在尝试测试与 AWS S3 一起使用的类,并且我正在使用 moto 来模拟 AWS。模拟对于使用同步代码的示例效果很好(来自 moto 文档的示例)

import boto3
from moto import mock_s3


class MyModel(object):
    def __init__(self, name, value):
        self.name = name
        self.value = value

    def save(self):
        s3 = boto3.client('s3', region_name='us-east-1')
        s3.put_object(Bucket='mybucket', Key=self.name, Body=self.value)


def test_my_model_save():
    with mock_s3():
        conn = boto3.resource('s3', region_name='us-east-1')
        conn.create_bucket(Bucket='mybucket')

        model_instance = MyModel('steve', 'is awesome')
        model_instance.save()
        body = conn.Object('mybucket', 'steve').get()['Body'].read().decode("utf-8")

        assert body == 'is awesome'

但是,重写此代码以使用 aiobotocore 模拟后不起作用 - 在我的示例中它连接到真正的 AWS S3。

import aiobotocore
import asyncio

import boto3
from moto import mock_s3


class MyModel(object):
    def __init__(self, name, value):
        self.name = name
        self.value = value

    async def save(self, loop):
        session = aiobotocore.get_session(loop=loop)
        s3 = session.create_client('s3', region_name='us-east-1')
        await s3.put_object(Bucket='mybucket', Key=self.name, Body=self.value)


def test_my_model_save():
    with mock_s3():
        conn = boto3.resource('s3', region_name='us-east-1')
        conn.create_bucket(Bucket='mybucket')
        loop = asyncio.get_event_loop()

        model_instance = MyModel('steve', 'is awesome')
        loop.run_until_complete(model_instance.save(loop=loop))
        body = conn.Object('mybucket', 'steve').get()['Body'].read().decode("utf-8")

        assert body == 'is awesome'

所以我的假设是 moto 无法与 aiobotocore 正常工作。如果我的源代码类似于第二个示例,我如何才能有效地模拟 AWS 资源?

python python-3.x python-asyncio aiohttp moto
7个回答
13
投票

moto

 的模拟不起作用,因为它们使用同步 API。但是,您可以启动 
moto
 服务器并配置 
aiobotocore
 以连接到此测试服务器。
查看 aiobotocore 测试以获取灵感。


3
投票
使用 AWS 的存根应该可以解决问题。以下是我在龙卷风应用程序中执行 aws 读取操作的方法:

import aiobotocore from botocore.stub import Stubber from tornado.testing import AsyncTestCase from aiobotocore.response import StreamingBody class RawStream(io.BytesIO): async def __aenter__(self): return self async def __aexit__(self, exc_type, exc_val, exc_tb): pass async def read(self, n): return super().read(n) class S3TestCase(AsyncTestCase): def setUp(self): super().setUp() session = aiobotocore.get_session() self.client = session.create_client("s3", region_name="AWS_S3_REGION", aws_secret_access_key="AWS_SECRET_ACCESS_KEY", aws_access_key_id="AWS_ACCESS_KEY_ID") @tornado.testing.gen_test def test_read(self): stubber = Stubber(self.client) stubber.add_response("get_object", {"Body": StreamingBody(raw_stream=RawStream(self.binary_content), content_length=128), "ContentLength": 128}, expected_params={"Bucket": "AWS_S3_BUCKET", "Key": "filename"}) stubber.activate() response = await client.get_object(Bucket="AWS_S3_BUCKET", Key="filename")

写入操作应该类似。希望这能引导您走向正确的方向。

有关存根的更多信息:

https://botocore.amazonaws.com/v1/documentation/api/latest/reference/stubber.html


3
投票
我认为 Sebastian Brestins 的答案应该是公认的。我将发布这个新答案,因为自发布以来有些事情发生了变化,例如python 3.8 现在支持异步测试用例,aioboto3 客户端现在是上下文管理器。

使用 python 3.8 的最小示例如下所示:

from unittest import IsolatedAsyncioTestCase import aioboto3 from botocore.stub import Stubber class Test(IsolatedAsyncioTestCase): async def asyncSetUp(self): self._s3_client = await aioboto3.client('s3').__aenter__() self._s3_stub = Stubber(self._s3_client) async def asyncTearDown(self): await self._s3_client.__aexit__(None, None, None) async def test_case(self): self._s3_stub.add_response( "get_object", {"Body": "content"}, expected_params={"Bucket": "AWS_S3_BUCKET", "Key": "filename"} ) self._s3_stub.activate() response = await self._s3_client.get_object(Bucket="AWS_S3_BUCKET", Key="filename") self.assertEquals(response, "content")
    

3
投票
不幸的是,这不是一个完整的答案,但有一个添加此功能的拉取请求已开放 6 个月:

https://github.com/aio-libs/aiobotocore/pull/766

当我处理类似的 asyncio 问题时,我为同步对象手动编码了“异步”包装器。


2
投票
这是来自 aiobotocore 的mock_server.py,没有 pytest:

# Initially from https://raw.githubusercontent.com/aio-libs/aiobotocore/master/tests/mock_server.py import shutil import signal import subprocess as sp import sys import time import requests _proxy_bypass = { "http": None, "https": None, } def start_service(service_name, host, port): moto_svr_path = shutil.which("moto_server") args = [sys.executable, moto_svr_path, service_name, "-H", host, "-p", str(port)] process = sp.Popen(args, stdin=sp.PIPE, stdout=sp.PIPE, stderr=sp.DEVNULL) url = "http://{host}:{port}".format(host=host, port=port) for _ in range(30): if process.poll() is not None: break try: # we need to bypass the proxies due to monkeypatches requests.get(url, timeout=0.1, proxies=_proxy_bypass) break except requests.exceptions.RequestException: time.sleep(0.1) else: stop_process(process) raise AssertionError("Can not start service: {}".format(service_name)) return process def stop_process(process, timeout=20): try: process.send_signal(signal.SIGTERM) process.communicate(timeout=timeout / 2) except sp.TimeoutExpired: process.kill() outs, errors = process.communicate(timeout=timeout / 2) exit_code = process.returncode msg = "Child process finished {} not in clean way: {} {}" \ .format(exit_code, outs, errors) raise RuntimeError(msg)
    

2
投票
我们可以使用 moto[server] 创建 S3 服务器,然后用它创建一个 pytest 固定装置,类似于

aioboto3

@pytest.yield_fixture(scope='session') def s3_server(): host = 'localhost' port = 5002 url = 'http://{host}:{port}'.format(host=host, port=port) process = start_service('s3', host, port) yield url stop_process(process)
然后 

patch('aiobotocore.AioSession.create_client')

 return_value 与 
aiobotocore.get_session().create_client('s3', region_name='us-east-1', end_point_url=s3_server)
 如下

async with aiobotocore.get_session().create_client('s3', region_name='us-east-1', end_point_url=s3_server) as client: with patch('aiobotocore.AioSession.create_client') as mock: mock.return_value = client # Test your code
    

0
投票
这是一个使用

moto

 包为您完成此操作的装置:

`` pip install moto[服务器]

导入moto.server

@pytest.fixture 异步 def s3_server(): 从 moto.server 导入 ThreadedMotoServer 服务器= ThreadedMotoServer(端口= 9736) 服务器.start() os.environ["AWS_ENDPOINT_URL"] = "http://127.0.0.1:9736"

s3 = await get_s3() await s3.create_bucket(Bucket=USER_BUCKET_NAME) yield del os.environ["AWS_ENDPOINT_URL"] server.stop()

You can include this fixture in a test, and it will make aws "work" for that test.
    
© www.soinside.com 2019 - 2024. All rights reserved.