假设在 S3 存储桶中接收文件时调用 lambda 函数。 此 lambda 函数负责处理此文件以及在瞬态 AWS EMR 中提交的一些 Spark 作业。 Spark 作业如下:
s3://data-bucket/raw/file.YYYY-MM-DD.HH-MM-SS.csv
s3://data-bucket/preprocessed/file.YYYY-MM-DD.HH-MM-SS.csv
s3://data-bucket/preprocessed/file.YYYY-MM-DD.HH-MM-SS.csv
s3://data-bucket/features/pipeline1/file.YYYY-MM-DD.HH-MM-SS.csv
s3://data-bucket/features/pipeline1/file.YYYY-MM-DD.HH-MM-SS.csv
s3://data-bucket/predictions/pipeline1/file.YYYY-MM-DD.HH-MM-SS.csv
s3://data-bucket/preprocessed/file.YYYY-MM-DD.HH-MM-SS.csv
s3://data-bucket/features/pipeline2/file.YYYY-MM-DD.HH-MM-SS.csv
s3://data-bucket/features/pipeline2/file.YYYY-MM-DD.HH-MM-SS.csv
s3://data-bucket/predictions/pipeline2/file.YYYY-MM-DD.HH-MM-SS.csv
我可以使用以下代码按顺序运行这些步骤(A、B1、C1、B2、C2):
import boto3
emr = boto3.client('emr')
# code to define spark_submits = {"A": ..., "B1": ..., ....}
steps = [
{
'Name': step_name,
'ActionOnFailure': 'TERMINATE_CLUSTER' if step_name == 'A' else 'CONTINUE',
'HadoopJarStep': {
'Jar': 's3://eu-west-1.elasticmapreduce/libs/script-runner/script-runner.jar',
'Args': [*spark_submit_cmd[step_name]]
}
},
for step_name, spark_submit_cmd in spark_submits.items()
]
response = emr.run_job_flow(
Name='MyJobFlow',
Steps=steps,
# other configurations and parameters ...
)
但是,这确实很耗时,我更愿意并行化(B1,C1)和(B2,C2)。 请注意,A 必须在运行 B1/B2 之前完成,并且 B1/B2 必须在运行 C1/C2 之前完成。
如何通过最佳实践来实现这一目标?我想避免编写肮脏的 bash 脚本来实现这一点......任何帮助将不胜感激。非常感谢!
根据您的顺序要求,AWS Step Functions 可能是编排的最佳选择。特别是因为您可以将所有排序逻辑保留在 Lambda 之外。