在 EKS 上运行 spark-sql 作业 EMR

问题描述 投票:0回答:0

我想提交一个 spark-sql 作业以通过气流在 EKS 上的 EMR 上运行。我在 AWS 上发现了一个新版本 sparkSQlJobDriver 和一个用例 here.

我尝试按如下方式在我的 dag 中实现它,但我在 jobDriver 中得到一个 **Unknown parameter: "sparkSqlJobDriver", must be one of: sparkSubmitJobDriver ** 错误。这适用于旧的

sparkSubmitJobDriver

此外,还有一些关于我正在做的事情的背景信息。我将我的 docker 镜像推送到 ECR,并在我的配置覆盖中引用它,以便能够在本地获取代码。

任何帮助将不胜感激!!


from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.providers.amazon.aws.operators.emr import EmrContainerOperator
from datetime import datetime, timedelta
from airflow.models import Variable
import json


VIRTUAL_CLUSTER_ID = Variable.get("emr_virtual_cluster_id")
JOB_ROLE_ARN = Variable.get("emr_execution_role_arn")


CONFIGURATION_OVERRIDES_ARG = {
    "applicationConfiguration": [
        {
            "classification": "spark-defaults",
            "properties": {
                "spark.kubernetes.container.image": "image-name", 
                "spark.sql.catalogImplementation": "hive",
                "spark.hadoop.hive.metastore.client.factory.class": "com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory",
                "spark.sql.shuffle.partitions": "80",
                "spark.default.parallelism": "80",
                "spark.driver.cores": "3",
                "spark.driver.memory": "16g",
                "spark.executor.cores": "4",
                "spark.executor.memory": "16g",
                "spark.dynamicAllocation.enabled": "true",
                "spark.sql.sources.partitionOverwriteMode": "dynamic",
                "spark.dynamicAllocation.shuffleTracking.enabled": "true",
                "spark.dynamicAllocation.initialExecutors": "1",
                "spark.dynamicAllocation.minExecutors": "1",
                "spark.dynamicAllocation.maxExecutors": "20",
                "spark.memory.offHeap.enabled": "true",
                "spark.memory.offHeap.size": "2g",
                "spark.executor.memoryOverhead": "8g",
                "spark.driver.memoryOverhead": "8g"

            },
        }
    ],
    "monitoringConfiguration": {
        "cloudWatchMonitoringConfiguration": {
            "logGroupName": "/aws/caspian-emr-eks-spark",
            "logStreamNamePrefix": "airflow",
        }
    },
}


job_driver = {
  "sparkSqlJobDriver": {
      "entryPoint": "local:///sql_test_repo/test_sparkSQL_job.sql"
      }
}

config_overrides = json.dumps(CONFIGURATION_OVERRIDES_ARG)
job_driver_string = json.dumps(job_driver)


with DAG('sparkSQLtest',
         start_date=datetime(2021, 9, 19),
         schedule_interval=@once,
         catchup=False
         ) as dag:

   
    create_schema = BashOperator(
        task_id = 'sparkSQL_test',
        bash_command = f'aws emr-containers start-job-run \
                          --virtual-cluster-id {VIRTUAL_CLUSTER_ID} \
                          --name sparksql-create-table \
                          --execution-role-arn {JOB_ROLE_ARN}\
                          --release-label emr-6.4.0-latest \
                          --job-driver \'{job_driver_string}\'\
                            --configuration-overrides {config_overrides}',
        dag=dag,
    )

amazon-web-services apache-spark apache-spark-sql airflow amazon-emr
© www.soinside.com 2019 - 2024. All rights reserved.