我无法为我的 Spark 应用程序设置
environment variables
。我正在使用 AWS EMR
来运行 Spark 应用程序。这更像是我在 Spark 之上用 Python 编写的一个框架,用于根据存在的环境变量运行多个 Spark 作业。因此,为了让我开始准确的工作,我需要将环境变量传递到spark-submit
。我尝试了几种方法来做到这一点。但它们都不起作用。当我尝试打印应用程序内环境变量的值时,它返回空。
要在 EMR 中运行集群,我使用以下 AWS CLI 命令
aws emr create-cluster --applications Name=Hadoop Name=Hive Name=Spark --ec2-attributes '{"KeyName":"<Key>","InstanceProfile":"<Profile>","SubnetId":"<Subnet-Id>","EmrManagedSlaveSecurityGroup":"<Group-Id>","EmrManagedMasterSecurityGroup":"<Group-Id>"}' --release-label emr-5.13.0 --log-uri 's3n://<bucket>/elasticmapreduce/' --bootstrap-action 'Path="s3://<bucket>/bootstrap.sh"' --steps file://./.envs/steps.json --instance-groups '[{"InstanceCount":1,"InstanceGroupType":"MASTER","InstanceType":"c4.xlarge","Name":"Master"}]' --configurations file://./.envs/Production.json --ebs-root-volume-size 64 --service-role EMRRole --enable-debugging --name 'Application' --auto-terminate --scale-down-behavior TERMINATE_AT_TASK_COMPLETION --region <region>
现在
Production.json
看起来像这样:
[
{
"Classification": "yarn-env",
"Properties": {},
"Configurations": [
{
"Classification": "export",
"Properties": {
"FOO": "bar"
}
}
]
},
{
"Classification": "spark-defaults",
"Properties": {
"spark.executor.memory": "2800m",
"spark.driver.memory": "900m"
}
}
]
还有
steps.json
,像这样:
[
{
"Name": "Job",
"Args": [
"--deploy-mode","cluster",
"--master","yarn","--py-files",
"s3://<bucket>/code/dependencies.zip",
"s3://<bucket>/code/__init__.py",
"--conf", "spark.yarn.appMasterEnv.SPARK_YARN_USER_ENV=SHAPE=TRIANGLE",
"--conf", "spark.yarn.appMasterEnv.SHAPE=RECTANGLE",
"--conf", "spark.executorEnv.SHAPE=SQUARE"
],
"ActionOnFailure": "CONTINUE",
"Type": "Spark"
}
]
当我尝试访问
__init__.py
代码中的环境变量时,它只是打印空。正如您所看到的,我正在使用 Spark 和 yarn cluster
中的 cluster mode
运行该步骤。我通过这些链接到达了这个位置。
使用分类yarn-env将环境变量传递给工作节点。
使用分类spark-env将环境变量传递给驱动程序,具有部署模式客户端。当使用部署模式集群时,使用yarn-env。
为了使用 EMR 集群,我使用 AWS Lambda 创建一个项目,在条件中设置标志时构建 EMR 集群。 在这个项目中,我们定义了您可以在 Lambda 中设置的变量,然后将其替换为其值。要使用它,我们必须使用 AWS API。您必须使用的可能方法是
AWSSimpleSystemsManagement.getParameters
。
然后,制作一个像 val parametersValues = parameterResult.getParameters.asScala.map(k => (k.getName, k.getValue))
这样的地图,以包含一个包含其名称和值的元组。
例如:
${BUCKET} = "s3://bucket-name/
这意味着,您只需在 JSON ${BUCKET} 中写入路径的所有名称即可。
一旦替换了值,步骤 JSON 就可以有这样的视图,
[
{
"Name": "Job",
"Args": [
"--deploy-mode","cluster",
"--master","yarn","--py-files",
"${BUCKET}/code/dependencies.zip",
"${BUCKET}/code/__init__.py",
"--conf", "spark.yarn.appMasterEnv.SPARK_YARN_USER_ENV=SHAPE=TRIANGLE",
"--conf", "spark.yarn.appMasterEnv.SHAPE=RECTANGLE",
"--conf", "spark.executorEnv.SHAPE=SQUARE"
],
"ActionOnFailure": "CONTINUE",
"Type": "Spark"
}
]