我们如何最好地通过 Glue、基于 EMR 的 Spark 作业配置上述 AWS Sagemaker ML 模型端点?
正如我们在 AWS 文档 'here' 中看到的,创建了一个名为 'linear-learner-2019-11-04-01-57-20-572' 的端点。它可以被调用为
response = client.invoke_endpoint(EndpointName='linear-learner-2019-11-04-01-57-20-572',
ContentType='text/csv',Body=values)
但是,假设我们有一个批处理作业,这样
我们如何最好地通过 Glue、基于 EMR 的 Spark 作业配置所述端点?
您可以使用 Amazon Step Functions 创建操作工作流程并依次触发每个任务(EMR、Glue、Athena、SageMaker 等)。关于批处理任务,我建议您考虑启动 SageMaker 处理或 SageMaker 批量推理作业
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from pyspark.sql import SparkSession
# Initialize Spark and Glue contexts
spark_context = SparkContext()
glue_context = GlueContext(spark_context)
spark = glue_context.spark_session
# Specify your SageMaker endpoint
endpoint_name = 'your-sagemaker-endpoint-name'
# Read data from S3 into a Spark DataFrame
s3_path = 's3://your-bucket/your-data-prefix/'
data_frame = spark.read.format('parquet').load(s3_path)
# Define a UDF to invoke the SageMaker endpoint
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
def invoke_sagemaker_udf(input_data):
from sagemaker.predictor import Predictor
predictor = Predictor(endpoint_name=endpoint_name, sagemaker_session=sagemaker_session)
result = predictor.predict(input_data)
return result
# Register the UDF
sagemaker_udf = udf(invoke_sagemaker_udf, StringType())
# Apply the UDF to the DataFrame
result_df = data_frame.withColumn("prediction", sagemaker_udf("input_data"))
# Perform any additional transformations if needed
# ...
# Write the result back to S3 or another destination
output_s3_path = 's3://your-bucket/your-output-prefix/'
result_df.write.format('parquet').mode('overwrite').save(output_s3_path)