探索 Sagemaker Python SDK 并尝试设置最小管道:一个自定义 Python 脚本,它从 S3 文件夹读取两个 csv 文件,处理数据并将单个文件写回 S3。似乎找不到任何解释此过程的示例/文档。
网上找到的大多数示例似乎都使用 SKLearnProcessor 或 PySparkProcessor。
您的任务可以通过简单的处理工作来解决。
通过使用 SKLearnProcessor,您已经暗示了 scikit-learn 容器的使用。否则,您可以使用通用 ScriptProcessor 并指定最合适的容器(sagemaker 中已存在,例如 scikit-learn 的容器或您自己完全定制的容器之一)。
在 scikit-learn 容器中,已经存在 pandas 和 numpy 等库。 您可以在此处查看完整的要求列表。
下面是回答有关 S3 文件输入/输出问题的代码示例:
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.sklearn.processing import SKLearnProcessor
sklearn_processor = SKLearnProcessor(
framework_version=framework_version, # e.g. "1.0-1",
role=role,
instance_type=your_instance_type, # e.g. 'ml.m5.large'
base_job_name = your_base_job_name,
instance_count=your_instance_count, # e.g. 1
)
sklearn_processor.run(
code=your_script_path,
inputs=[
ProcessingInput(
input_name='insert-custom-name-for-first-file',
source=first_file_s3_uri,
destination="/opt/ml/processing/input/data",
s3_data_type='S3Prefix',
s3_input_mode="File"
),
ProcessingInput(
input_name='insert-custom-name-for-second-file',
source=second_file_s3_uri,
destination="/opt/ml/processing/input/data",
s3_data_type='S3Prefix',
s3_input_mode="File"
)
],
outputs=[
ProcessingOutput(
output_name="output-channel-name",
destination=output_s3_path_uri,
source="/opt/ml/processing/processed_data"
)
]
)
如果两个输入文件位于 S3 上的同一文件夹中,您可以直接加载指向该文件夹的单个 ProcessingInput,而不是分隔两个文件。但是,由于它们只有两个,我建议像我在示例中所做的那样区分它们。
对于依赖项:如果它们是要加载的模块,您可以像ProcessingInput一样传递它们。请参阅 source_dir
、
dependencies
和 git_config
下的 run() 文档。这样您就可以为您的任务选择最佳配置。
综上所述,直接使用sklearn容器是没有错的。安装一些你不需要的东西,但不是很多。如果您没有特别需要与其他库兼容,请使用这个现成的容器。否则,请使用自定义容器。
一直在尝试实现Giuseppe的答案,但到目前为止,这是我让
requirements.txt
工作的唯一方法:
文件夹结构:
parent_dir/
requirements.txt
pipelines/my_pipeline/
pipeline.py
step_processing.py
preprocess.py
helper_functions.py
step_processing.py
的一部分:
sklearn_processor = FrameworkProcessor(
estimator_cls=SKLearn,
framework_version="0.23-1",
instance_type=processing_instance_type,
instance_count=processing_instance_count,
base_job_name=base_job_name,
sagemaker_session=pipeline_session,
role=role,
)
step_args = sklearn_processor.run(
outputs=[
ProcessingOutput(output_name="train", source="/opt/ml/processing/train"),
ProcessingOutput(output_name="validation", source="/opt/ml/processing/validation"),
ProcessingOutput(output_name="test", source="/opt/ml/processing/test"),
],
code='preprocess.py',
arguments=["--input-data", input_data],
source_dir=BASE_DIR,
dependencies=['requirements.txt'],
)
如果我能让 Giuseppe 的答案起作用,这意味着我可以将
requirements.txt
和预处理代码移动到 my_pipeline/
内的子文件夹中,这将非常棒。然而,目前我只有将 requirements.txt
直接放在父目录下才能成功。