将 moto 与 pandas read_parquet 和 to_parquet 函数结合使用

问题描述 投票:0回答:1

我正在尝试为使用

pd.read_parquet()
函数的函数编写单元测试,我正在努力使其工作。我有下面的代码

from moto import mock_aws
import pandas as pd
import pytest
import datetime as dt
import boto3
from my_module import foo

@pytest.fixture
def mock_df():
    cols = [
        "timestamp",
        "value"
    ]
    values = [
        [dt.datetime(2024, 1, 1, 0), 2.57],
        [dt.datetime(2024, 1, 1, 1), 1.41],
        [dt.datetime(2024, 1, 1, 2), 2.06],
    ]
    df = pd.DataFrame(values, columns=cols)
    return df


@mock_aws
def test_download(mock_df):
    bucket_name = "test-input-bucket"
    s3 = boto3.resource("s3", region_name="us-east-1")
    s3.create_bucket(Bucket=bucket_name)
    key1 = "s3://test-input-bucket/path/to/data.parquet"
    mock_df.to_parquet(key1) # code fails already here
    foo() # uses pd.read_parquet()

但我收到此错误

OSError: When initiating multiple part upload for key 'path/to/data.parquet'
in bucket 'test-input-bucket': AWS Error INVALID_ACCESS_KEY_ID during 
CreateMultipartUpload operation: The AWS Access Key Id you provided does not exist in our records.

无论我使用

to_parquet
还是尝试使用
read_parquet
,我都会遇到相同的错误。一切正常,如果我使用不同的东西进行上传和下载,比如

s3_bucket.put_object(Key=key1, Body=mock_df.to_parquet())

但是我对替换 pandas 函数不感兴趣,因为在我的情况下这是不可能的,并且需要找到一种在使用它们时模拟 S3 的方法。有没有办法让

moto
使用这些函数?

编辑: 我正在使用这些版本

boto3                                    1.28.64
botocore                                 1.31.64
moto                                     5.0.3
python pandas mocking pytest moto
1个回答
0
投票

这解决了我的问题。我并不是100%适用于每一个案例。

最终,我们确实将问题缩小到在测试中使用

to_parquet
read_parquet
。 使用
fastparquet
作为引擎 (
engine='fastparquet'
) 似乎提供了一个解决方案,但对我们来说并不总是可行。

出于某种原因,

pyarrow
想要凭证,而其他软件包则不在乎。添加凭据并在创建连接时强制它们对我们来说很有效。 所以类似


@pytest.fixture
def aws_credentials():
    """Mocked AWS Credentials for moto."""
    os.environ["AWS_ACCESS_KEY_ID"] = "testing"
    os.environ["AWS_SECRET_ACCESS_KEY"] = "testing"
    os.environ["AWS_SECURITY_TOKEN"] = "testing"
    os.environ["AWS_SESSION_TOKEN"] = "testing"


@mock_aws
def test_file(aws_credentials, mock_df):
    with mock_aws(aws_credentials):
        conn = boto3.client("s3", region_name="us-east-1")
        yield conn
    conn.create_bucket(Bucket="testbucket")

允许我们访问存储桶。 让我知道它是否也适用于您。

© www.soinside.com 2019 - 2024. All rights reserved.