我有一个步骤函数,它有一个推理/转换工作,它需要计算预测结果和预测概率。
{
"Comment": "A description of my state machine for model onboarding",
"StartAt": "Transform",
"States": {
"Transform": {
"Type": "Parallel",
"Branches": [
{
"StartAt": "Transform Job",
"States": {
"Transform Job": {
"Type": "Task",
"Resource": "arn:aws:states:::sagemaker:createTransformJob.sync",
"Parameters": {
"TransformJobName.$": "States.Format('test-transform-{}', $$.Execution.Input.currentTimestamp)",
"ModelName": "model-name",
"TransformInput": {
"DataSource": {
"S3DataSource": {
"S3DataType": "S3Prefix",
"S3Uri": "s3://test/input.pq"
}
},
"ContentType": "application/x-npy",
"SplitType": "None",
"CompressionType": "None"
},
"TransformOutput": {
"S3OutputPath": "s3://test/output"
},
"TransformResources": {
"InstanceType": "ml.m5.4xlarge",
"InstanceCount": 1
},
"BatchStrategy": "SingleRecord",
"MaxConcurrentTransforms": 1,
"MaxPayloadInMB": 90
},
"End": true
}
}
}
],
"End": true
},
"Prem Prediction": {
"Type": "Parallel",
"Branches": [
{
"StartAt": "Prem Prediction Job",
"States": {
"Prem Prediction Job": {
"Type": "Task",
"Resource": "arn:aws:states:::sagemaker:createProcessingJob.sync",
"Parameters": {
"ProcessingJobName.$": "States.Format('test-post-prediction-{}', $$.Execution.Input.currentTimestamp)",
"ProcessingInputs": [
{
"InputName": "code",
"S3Input": {
"S3Uri.$": "States.Format('s3://{}/code/', $$.Execution.Input.s3Bucket)",
"LocalPath": "/opt/ml/processing/input/code/",
"S3DataType": "S3Prefix",
"S3InputMode": "File",
"S3DataDistributionType": "FullyReplicated"
}
}
],
"ProcessingResources": {
"ClusterConfig": {
"InstanceType": "ml.m5.12xlarge",
"InstanceCount": 1,
"VolumeSizeInGB": 60
}
},
"NetworkConfig": {
"EnableNetworkIsolation": false,
"VpcConfig": {
"SecurityGroupIds": [
"sg-asdasd"
],
"Subnets": [
"subnet-asdasd"
]
}
},
"RoleArn.$": "$$.Execution.Input.roleArn",
"Environment": {
"http_proxy": "http://google.com"
},
"AppSpecification": {
"ImageUri.$": "$$.Execution.Input.imageUri",
"ContainerArguments": [
"--s3bucket",
"test"
],
"ContainerEntrypoint": [
"python3",
"/opt/ml/processing/input/code/premPrediction.py"
]
}
},
"End": true
}
}
}
],
"End": true
}
}
}
在转换作业中运行的Python代码是这样的
def generate_predictions(data:pd.DataFrame, data_format:str):
"""
Return in data format that was recieved: csv, parquet, gzip, json
"""
print('predicting')
predictions = ScoringService.predict(data)
print(type(predictions))
predictions_prob = ScoringService.predict_proba(data)
print(type(predictions_prob))
print(predictions_prob.shape)
print('predictions generated')
if data_format == "text/csv":
out = io.StringIO()
predictions.to_csv(out, index=False)
elif data_format in ["application/parquet","application/gzip"]:
out = io.BytesIO()
print('about to save {}'.format(out))
predictions=pd.DataFrame(predictions)
elif data_format =="application/x-npy":
s3_client = boto3.client("s3")
## save prediction
out = io.BytesIO()
np.save(out, predictions)
upload_obj = io.BytesIO(out.getvalue())
bucket_name = "test"
s3_path = f'output/y_pred_{today}.npy'
s3_client.upload_fileobj(upload_obj, bucket_name, s3_path)
## save prediction probability
out_pred_prob = io.BytesIO()
np.save(out_pred_prob, predictions_prob)
upload_obj_pred_prob = io.BytesIO(out_pred_prob.getvalue())
bucket_name = "test"
s3_path = f'output/y_pred_prob_{today}.npy'
s3_client.upload_fileobj(upload_obj_pred_prob, bucket_name, s3_path)
elif data_format == "application/json":
out = io.StringIO()
predictions.to_json(out, index=False)
else:
logging.error(f"Unsupported data_format: {data_format}")
out = None
data_format = None
return out, data_format
在此条件 data_format ==="application/x-npy": 中,有没有一种方法可以在此处检索 s3output 路径,而不是对 s3bucket 进行编码。这样我就不必使用硬编码并在Python代码中转换出值,然后将其拆分以获得sc3存储桶名称。
确认generate_predictions函数在您的批量转换作业中运行,而不是在Step Function中提到的处理作业中运行?
如果是这样,为什么要在脚本中将数据上传到S3? SageMaker Batch Transform 将为您完成此操作。
您在 S3OutputPath 中指定 S3 位置。我看到您指定了以下内容。您应该在您的帐户中指定您拥有的存储桶,Batch Transform 会代表您自动将输出上传到 S3。
"TransformOutput": {
"S3OutputPath": "s3://test/output"
},