如何使用 pytest 模拟 s3?单元测试脚本调用实际函数而不是模拟它?

问题描述 投票:0回答:1

我正在尝试测试一个应该模拟 s3 资源的函数,但它尝试调用实际的函数。

这是我要测试的代码。

import sys
import requests
import json
import pandas as pd
import time
import datetime
import logging
import traceback
from pyspark import SparkContext, SparkConf
import math
from botocore.exceptions import ClientError
from pyspark.sql import SparkSession
import boto3



# conf = SparkConf().setMaster('local').setAppName('My App')
# sc = SparkContext(conf=conf)

# # Create a SparkSession from the SparkContext
# spark = SparkSession(sc)

# Initialize boto3
s3 = boto3.resource("s3")

# Set up logging
logging.basicConfig(level=logging.INFO)


class Rivirproductlist:
    
    """  
    This class contains different functionalities to generate the access token,log and Fetch the Raw Json data 
    from Product Item Management list in Rivir API.
    
    """

    def __init__(
            self,
            ApplicationID,
            secret_name,
            region_name,
            batchsize,
            stageS3Bucket,
            logs3key,
            rootfolder,
            configs3key,
            s3Key

        ):
            
        
        """
        Constructor method to initialize class attributes.
        
        """

        # self.spark = SparkSession.builder.appName(ApplicationID).getOrCreate()
        self.ApplicationID = ApplicationID
        self.secret_name = secret_name
        self.region_name = region_name
        self.batchsize = int(batchsize)
        self.stageS3Bucket = stageS3Bucket
        self.logs3key = logs3key
        self.rootfolder = rootfolder
        self.configs3key = configs3key
        self.s3Key = s3Key

    def save_to_s3(self,bucket, key, json_data):
        
        """
        Function to save the raw data to S3
        
        """
        try:
            s3object = s3.Object(bucket, key)
            s3object.put(Body=(bytes(json.dumps(json_data, indent = 2).encode('UTF-8'))))
        except Exception as e:
            raise Exception(f"failed to save in s3. Error:{e}")
        

if __name__ == '__main__':

    # The arguments that include the glue job parameters.
    args = getResolvedOptions(
        sys.argv,
        [
         "ApplicationID",   
        "secret_name",
        "region_name",
        "batchsize",
        "stageS3Bucket",
        "logs3key",
        "rootfolder",
        "configs3key",
        "s3Key"
        ],
    )
    
    # Extracting and assigning the configuration options to variables.
    ApplicationID = args["ApplicationID"]
    secret_name = args["secret_name"]
    region_name = args["region_name"]
    batchsize = args["batchsize"]
    stageS3Bucket = args["stageS3Bucket"]
    logs3key = args["logs3key"]
    rootfolder = args["rootfolder"]
    configs3key = args["configs3key"]
    s3Key = args["s3Key"]
    
    # Creating an instance of Rivirproductlist with the specified configuration
    obj = Rivirproductlist(

        ApplicationID,
        secret_name,
        region_name,
        batchsize,
        stageS3Bucket,
        logs3key,
        rootfolder,
        configs3key,
        s3Key
    )
    
    # Execute the main processing function of the object
    obj.save_to_s3()

代码将在 AWS 胶水作业中执行,但我正在本地测试它。

单元测试脚本:

import unittest
#from unittest import  patch
from pyspark.sql import SparkSession , DataFrame
from s3_test  import Rivirproductlist  # replace with the actual module name
import sys
from botocore.exceptions import ClientError
from datetime import datetime
from moto import mock_aws
import pytest
from mock import patch


#sys.modules["awsglue.transforms"] = MagicMock()
#sys.modules["awsglue.utils"] = MagicMock() 

class TestRivirproductlist(unittest.TestCase):
    @patch('pyspark.sql.SparkSession')
    def setUp(self, mock_spark):
        self.mock_spark = mock_spark
        self.rivirproductlist = Rivirproductlist(
            ApplicationID='test',
            secret_name='test_secret',
            region_name='us-west-2',
            batchsize=100,
            stageS3Bucket='test_bucket',
            logs3key='test_log',
            rootfolder='test_root',
            configs3key='test_config',
            s3Key='test_key'
        )

    @mock_aws
    def test_save_to_s3(self):
        # Given
        bucket = 'test_bucket'
        key = 'test_key'
        #self.s3.create_bucket(Bucket=bucket)

        # Initialize the Rivirproductlist object
        rivirproductlist = Rivirproductlist(
            ApplicationID='test',
            secret_name='test_secret',
            region_name='us-west-2',
            batchsize=100,
            stageS3Bucket='test_bucket',
            logs3key='test_log',
            rootfolder='test_root',
            configs3key='test_config',
            s3Key='test_key'
        )

        # When
        rivirproductlist.save_to_s3(bucket, key, {'key': 'value'})

        # Then
        response = self.s3.get_object(Bucket=bucket, Key=key)
        data = response['Body'].read().decode('utf-8')
        self.assertEqual(data, json.dumps({'key': 'value'}, indent=2))

if __name__ == '__main__':
    unittest.main()

当我提供凭证数据时,它会给我错误:

AWS Access Key Id you provided does not exist in our records

当我不提供凭据时,它就会给出

Unable to locate credentials

为什么它尝试执行实际函数而不是模拟函数?

当我在单元测试脚本中提供凭据数据时,它会给我错误:

AWS Access Key Id you provided does not exist in our records

当我不提供凭据时,它就会给出

Unable to locate credentials

python unit-testing pytest python-unittest.mock
1个回答
0
投票

可能有些东西仍在与真正的 AWS 对话;确保您的端点设置正确。

首先尝试 adobe:https://github.com/adobe/S3Mock

© www.soinside.com 2019 - 2024. All rights reserved.