如何使用boto3(或其他方式)在emr上自动执行pyspark作业?

问题描述 投票:10回答:3

我正在创建一个解析大量服务器数据的工作,然后将其上传到Redshift数据库。

我的工作流程如下:

  • 从S3获取日志数据
  • 使用spark dataframes或spark sql来解析数据并写回S3
  • 将数据从S3上传到Redshift。

我已经开始讨论如何自动执行此操作,以便我的进程旋转EMR集群,引导正确的安装程序,并运行我的python脚本,其中包含用于解析和编写的代码。

有没有人有任何可以与我分享的示例,教程或经验,以帮助我学习如何做到这一点?

python amazon-s3 apache-spark pyspark amazon-emr
3个回答
19
投票

看看boto3 EMR docs来创建集群。你基本上必须调用run_job_flow并创建运行所需程序的步骤。

import boto3    

client = boto3.client('emr', region_name='us-east-1')

S3_BUCKET = 'MyS3Bucket'
S3_KEY = 'spark/main.py'
S3_URI = 's3://{bucket}/{key}'.format(bucket=S3_BUCKET, key=S3_KEY)

# upload file to an S3 bucket
s3 = boto3.resource('s3')
s3.meta.client.upload_file("myfile.py", S3_BUCKET, S3_KEY)

response = client.run_job_flow(
    Name="My Spark Cluster",
    ReleaseLabel='emr-4.6.0',
    Instances={
        'MasterInstanceType': 'm4.xlarge',
        'SlaveInstanceType': 'm4.xlarge',
        'InstanceCount': 4,
        'KeepJobFlowAliveWhenNoSteps': True,
        'TerminationProtected': False,
    },
    Applications=[
        {
            'Name': 'Spark'
        }
    ],
    BootstrapActions=[
        {
            'Name': 'Maximize Spark Default Config',
            'ScriptBootstrapAction': {
                'Path': 's3://support.elasticmapreduce/spark/maximize-spark-default-config',
            }
        },
    ],
    Steps=[
    {
        'Name': 'Setup Debugging',
        'ActionOnFailure': 'TERMINATE_CLUSTER',
        'HadoopJarStep': {
            'Jar': 'command-runner.jar',
            'Args': ['state-pusher-script']
        }
    },
    {
        'Name': 'setup - copy files',
        'ActionOnFailure': 'CANCEL_AND_WAIT',
        'HadoopJarStep': {
            'Jar': 'command-runner.jar',
            'Args': ['aws', 's3', 'cp', S3_URI, '/home/hadoop/']
        }
    },
    {
        'Name': 'Run Spark',
        'ActionOnFailure': 'CANCEL_AND_WAIT',
        'HadoopJarStep': {
            'Jar': 'command-runner.jar',
            'Args': ['spark-submit', '/home/hadoop/main.py']
        }
    }
    ],
    VisibleToAllUsers=True,
    JobFlowRole='EMR_EC2_DefaultRole',
    ServiceRole='EMR_DefaultRole'
)

如果您知道作业流ID,还可以向正在运行的集群添加步骤:

job_flow_id = response['JobFlowId']
print("Job flow ID:", job_flow_id)

step_response = client.add_job_flow_steps(JobFlowId=job_flow_id, Steps=SomeMoreSteps)

step_ids = step_response['StepIds']

print("Step IDs:", step_ids)

有关更多配置,请查看sparksteps


1
投票

只需使用AWS Data Pipeline这样做。每次将新文件放入存储桶https://docs.aws.amazon.com/lambda/latest/dg/with-s3-example.html时,您可以设置S3存储桶以触发lambda函数。然后你的Lambda函数将激活你的数据管道https://aws.amazon.com/blogs/big-data/using-aws-lambda-for-event-driven-data-processing-pipelines/然后你的数据管道使用EmrCluster旋转一个新的EMR集群,然后你可以指定你的引导选项,然后你可以使用EmrActivity运行你的EMR命令,当它全部完成时它将终止您的EMR群集并停用数据管道。


0
投票

实际上,我已经使用了AWS的Step Functions,它是Lambda函数的状态机包装器,因此您可以使用boto3使用run_job_flow启动EMR Spark作业,并且可以使用describe_cluaster来获取集群的状态。最后使用choice。所以你的步骤函数看起来像这样(括号中的步骤函数类型:

运行作业(任务) - >等待X分钟(等待) - >检查状态(任务) - >分支(选择)[=>回到等待,或=>完成]

© www.soinside.com 2019 - 2024. All rights reserved.