我想对从 aws 导入的文件进行测试。 我使用 moto 模拟 s3,以免弄乱实际数据。然而,现在 aws 似乎是空的,因此我决定在模拟的 s3 上上传一些测试文件。我怎样才能做到这一点?
这是我的设置,
竞赛.py:
@pytest.fixture(scope='function')
def aws_credentials():
"""Mocked AWS Credentials for moto."""
os.environ['AWS_ACCESS_KEY_ID'] = 'testing'
os.environ['AWS_SECRET_ACCESS_KEY'] = 'testing'
os.environ['AWS_SECURITY_TOKEN'] = 'testing'
os.environ['AWS_SESSION_TOKEN'] = 'testing'
@pytest.fixture(scope='function')
def s3(aws_credentials):
with mock_s3():
yield boto3.client('s3', region_name='us-east-1')
测试文件:
class TestLoadRecommendations:
@pytest.fixture(autouse=True)
def _setup(self, s3):
self.bucket = s3.create_bucket(Bucket=settings.SERVER_DATA_S3_BUCKET)
self.bucket.upload_file("tmp/recommendations-2021-04-13T17_28_06.csv", "tmp/recommendations-2021-04-13T17_28_06.csv")
但是,上传它会引发错误
TypeError: expected string or bytes-like object
,但我确信我使用了错误的命令来上传文件。
有人可以帮忙吗?谢谢!
将文件上传到 S3 有多种方法。您的示例结合了 S3 资源和 S3 客户端方法,这是行不通的。
请参阅以下代码作为示例:
三路通罗马。
import boto3
from moto import mock_s3
BUCKET_NAME = "mybucket"
FILE_NAME = "red.jpg"
FILE_LOCATION = FILE_NAME
@mock_s3
def test_client():
create_bucket()
s3 = boto3.client('s3')
with open(FILE_LOCATION, 'rb') as data:
s3.upload_fileobj(data, BUCKET_NAME, FILE_NAME)
verify_upload()
@mock_s3
def test_resource():
s3_resource, _ = create_bucket()
s3_resource.meta.client.upload_file(FILE_LOCATION, BUCKET_NAME, FILE_NAME)
#
verify_upload()
@mock_s3
def test_bucket_resource():
_, bucket = create_bucket()
bucket.upload_file(FILE_LOCATION, FILE_NAME)
#
verify_upload()
def verify_upload():
client = boto3.client("s3")
resp = client.get_object(Bucket=BUCKET_NAME, Key=FILE_NAME)
content_length = resp["ResponseMetadata"]["HTTPHeaders"]["content-length"]
print("Content-Length: {}".format(content_length))
def create_bucket():
s3 = boto3.resource("s3")
bucket = s3.create_bucket(Bucket=BUCKET_NAME)
return s3, bucket
注意:我正在使用装饰器,但这些示例使用 Moto 固定装置时的工作方式完全相同。
如果有人使用 conftest.py 文件来创建测试工件, 这是我的解决方案。
注:
如果您的文件位于 S3 存储桶中的文件夹中,我还展示了如何上传
我的文件夹结构是:根目录下的公共测试文件夹。
摩托版本:moto==4.2.3
@pytest.fixture(autouse=True)
def s3_bucket():
"""This fixture creates a local s3 bucket tests"""
with moto.mock_s3():
s3 = boto3.resource("s3", region_name=default_location)
s3_resource = s3.create_bucket(
Bucket=os.environ["PRODUCT_IMG_BUCKET_NAME"],
CreateBucketConfiguration={"LocationConstraint": default_location},
)
s3.meta.client.head_bucket(
Bucket=os.environ["PRODUCT_IMG_BUCKET_NAME"]
)
my_bucket = s3.Bucket(
os.environ["PRODUCT_IMG_BUCKET_NAME"]
)
test_upload_file_path = str(Path(__file__).parent / "MyFolder" / "1.png")
# upload a file into the bucket.
s3_resource.meta.client.upload_file(
test_upload_file_path,
os.environ["PRODUCT_IMG_BUCKET_NAME"],
"1.png",
)
verify_upload(
os.environ["PRODUCT_IMG_BUCKET_NAME"],
"1.png",
)
yield my_bucket
def verify_upload(bucket_name, file_name):
client = boto3.client("s3")
resp = client.get_object(Bucket=bucket_name, Key=file_name)
content_length = resp["ResponseMetadata"]["HTTPHeaders"]["content-length"]