从Python脚本中的step function trasnform作业中提取输出路径

问题描述 投票:0回答:1

我有一个步骤函数,它有一个推理/转换工作,它需要计算预测结果和预测概率。

{
  "Comment": "A description of my state machine for model onboarding",
  "StartAt": "Transform",
  "States": {
    "Transform": {
      "Type": "Parallel",
      "Branches": [
        {
          "StartAt": "Transform Job",
          "States": {
            "Transform Job": {
              "Type": "Task",
              "Resource": "arn:aws:states:::sagemaker:createTransformJob.sync",
              "Parameters": {
                "TransformJobName.$": "States.Format('test-transform-{}', $$.Execution.Input.currentTimestamp)",
                "ModelName": "model-name",
                "TransformInput": {
                  "DataSource": {
                    "S3DataSource": {
                      "S3DataType": "S3Prefix",
                      "S3Uri": "s3://test/input.pq"
                    }
                  },
                  "ContentType": "application/x-npy",
                  "SplitType": "None",
                  "CompressionType": "None"
                },
                "TransformOutput": {
                  "S3OutputPath": "s3://test/output"
                },
                "TransformResources": {
                  "InstanceType": "ml.m5.4xlarge",
                  "InstanceCount": 1
                },
                "BatchStrategy": "SingleRecord",
                "MaxConcurrentTransforms": 1,
                "MaxPayloadInMB": 90
              },
              "End": true
            }
          }
        }
      ],
      "End": true
    },
    "Prem Prediction": {
      "Type": "Parallel",
      "Branches": [
        {
          "StartAt": "Prem Prediction Job",
          "States": {
            "Prem Prediction Job": {
              "Type": "Task",
              "Resource": "arn:aws:states:::sagemaker:createProcessingJob.sync",
              "Parameters": {
                "ProcessingJobName.$": "States.Format('test-post-prediction-{}', $$.Execution.Input.currentTimestamp)",
                "ProcessingInputs": [
                  {
                    "InputName": "code",
                    "S3Input": {
                      "S3Uri.$": "States.Format('s3://{}/code/', $$.Execution.Input.s3Bucket)",
                      "LocalPath": "/opt/ml/processing/input/code/",
                      "S3DataType": "S3Prefix",
                      "S3InputMode": "File",
                      "S3DataDistributionType": "FullyReplicated"
                    }
                  }
                ],
                "ProcessingResources": {
                  "ClusterConfig": {
                    "InstanceType": "ml.m5.12xlarge",
                    "InstanceCount": 1,
                    "VolumeSizeInGB": 60
                  }
                },
                "NetworkConfig": {
                  "EnableNetworkIsolation": false,
                  "VpcConfig": {
                    "SecurityGroupIds": [
                      "sg-asdasd"
                    ],
                    "Subnets": [
                      "subnet-asdasd"
                    ]
                  }
                },
                "RoleArn.$": "$$.Execution.Input.roleArn",
                "Environment": {
                  "http_proxy": "http://google.com"
                },
                "AppSpecification": {
                  "ImageUri.$": "$$.Execution.Input.imageUri",
                  "ContainerArguments": [
                    "--s3bucket",
                    "test"
                  ],
                  "ContainerEntrypoint": [
                    "python3",
                    "/opt/ml/processing/input/code/premPrediction.py"
                  ]
                }
              },
              "End": true
            }
          }
        }
      ],
      "End": true
    }
  }
}

在转换作业中运行的Python代码是这样的

def generate_predictions(data:pd.DataFrame, data_format:str):
    """
    Return in data format that was recieved: csv, parquet, gzip, json
    """
    print('predicting')
    predictions = ScoringService.predict(data)
    print(type(predictions))
    predictions_prob = ScoringService.predict_proba(data)
    print(type(predictions_prob))
    print(predictions_prob.shape)
    print('predictions generated')
    if data_format == "text/csv":
        out = io.StringIO()
        predictions.to_csv(out, index=False)
    elif data_format in ["application/parquet","application/gzip"]:
        out = io.BytesIO()
        print('about to save {}'.format(out))
        predictions=pd.DataFrame(predictions)
    elif data_format =="application/x-npy":
        s3_client = boto3.client("s3")
        ## save prediction
        out = io.BytesIO()
        np.save(out, predictions)
        upload_obj = io.BytesIO(out.getvalue())
        bucket_name = "test"
        s3_path = f'output/y_pred_{today}.npy'
        s3_client.upload_fileobj(upload_obj, bucket_name, s3_path)
        ## save prediction probability
        out_pred_prob = io.BytesIO()
        np.save(out_pred_prob, predictions_prob)
        upload_obj_pred_prob = io.BytesIO(out_pred_prob.getvalue())
        bucket_name = "test"
        s3_path = f'output/y_pred_prob_{today}.npy'
        s3_client.upload_fileobj(upload_obj_pred_prob, bucket_name, s3_path)
    elif data_format == "application/json":
        out = io.StringIO()
        predictions.to_json(out, index=False)
    else:
        logging.error(f"Unsupported data_format: {data_format}")
        out = None
        data_format = None

    return out, data_format

在此条件 data_format ==="application/x-npy": 中,有没有一种方法可以在此处检索 s3output 路径,而不是对 s3bucket 进行编码。这样我就不必使用硬编码并在Python代码中转换出值,然后将其拆分以获得sc3存储桶名称。

python amazon-sagemaker aws-step-functions
1个回答
0
投票

确认generate_predictions函数在您的批量转换作业中运行,而不是在Step Function中提到的处理作业中运行?

如果是这样,为什么要在脚本中将数据上传到S3? SageMaker Batch Transform 将为您完成此操作。

您在 S3OutputPath 中指定 S3 位置。我看到您指定了以下内容。您应该在您的帐户中指定您拥有的存储桶,Batch Transform 会代表您自动将输出上传到 S3。

     "TransformOutput": {
                  "S3OutputPath": "s3://test/output"
                },
© www.soinside.com 2019 - 2024. All rights reserved.