并行化从 lambda 函数启动的瞬态 AWS EMR 中的步骤的最佳实践是什么?

问题描述 投票:0回答:1

假设在 S3 存储桶中接收文件时调用 lambda 函数。 此 lambda 函数负责处理此文件以及在瞬态 AWS EMR 中提交的一些 Spark 作业。 Spark 作业如下:

  • 步骤 A:运行预处理管道
    • 输入:
      s3://data-bucket/raw/file.YYYY-MM-DD.HH-MM-SS.csv
    • 输出:
      s3://data-bucket/preprocessed/file.YYYY-MM-DD.HH-MM-SS.csv
  • 步骤 B1:运行特征提取管道 1
    • 输入:
      s3://data-bucket/preprocessed/file.YYYY-MM-DD.HH-MM-SS.csv
    • 输出:
      s3://data-bucket/features/pipeline1/file.YYYY-MM-DD.HH-MM-SS.csv
  • 步骤 C1:运行机器学习管道 1
    • 输入:
      s3://data-bucket/features/pipeline1/file.YYYY-MM-DD.HH-MM-SS.csv
    • 输出:
      s3://data-bucket/predictions/pipeline1/file.YYYY-MM-DD.HH-MM-SS.csv
  • 步骤 B2:运行特征提取管道 2
    • 输入:
      s3://data-bucket/preprocessed/file.YYYY-MM-DD.HH-MM-SS.csv
    • 输出:
      s3://data-bucket/features/pipeline2/file.YYYY-MM-DD.HH-MM-SS.csv
  • 步骤 C2:运行机器学习管道 2
    • 输入:
      s3://data-bucket/features/pipeline2/file.YYYY-MM-DD.HH-MM-SS.csv
    • 输出:
      s3://data-bucket/predictions/pipeline2/file.YYYY-MM-DD.HH-MM-SS.csv

我可以使用以下代码按顺序运行这些步骤(A、B1、C1、B2、C2):

import boto3

emr = boto3.client('emr')

# code to define spark_submits = {"A": ..., "B1": ..., ....}

steps = [
    {
        'Name': step_name,
        'ActionOnFailure': 'TERMINATE_CLUSTER' if step_name == 'A' else 'CONTINUE',
        'HadoopJarStep': {
            'Jar': 's3://eu-west-1.elasticmapreduce/libs/script-runner/script-runner.jar',
            'Args': [*spark_submit_cmd[step_name]]
        }
    },
    for step_name, spark_submit_cmd in spark_submits.items()
]

response = emr.run_job_flow(
    Name='MyJobFlow',
    Steps=steps,
    # other configurations and parameters ...
)

但是,这确实很耗时,我更愿意并行化(B1,C1)和(B2,C2)。 请注意,A 必须在运行 B1/B2 之前完成,并且 B1/B2 必须在运行 C1/C2 之前完成。

如何通过最佳实践来实现这一目标?我想避免编写肮脏的 bash 脚本来实现这一点......任何帮助将不胜感激。非常感谢!

amazon-web-services amazon-s3 amazon-emr
1个回答
0
投票

根据您的顺序要求,AWS Step Functions 可能是编排的最佳选择。特别是因为您可以将所有排序逻辑保留在 Lambda 之外。

AWS 步骤函数

© www.soinside.com 2019 - 2024. All rights reserved.