我想提交一个 spark-sql 作业以通过气流在 EKS 上的 EMR 上运行。我在 AWS 上发现了一个新版本 sparkSQlJobDriver 和一个用例 here.
我尝试按如下方式在我的 dag 中实现它,但我在 jobDriver 中得到一个 **Unknown parameter: "sparkSqlJobDriver", must be one of: sparkSubmitJobDriver ** 错误。这适用于旧的
sparkSubmitJobDriver
此外,还有一些关于我正在做的事情的背景信息。我将我的 docker 镜像推送到 ECR,并在我的配置覆盖中引用它,以便能够在本地获取代码。
任何帮助将不胜感激!!
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.providers.amazon.aws.operators.emr import EmrContainerOperator
from datetime import datetime, timedelta
from airflow.models import Variable
import json
VIRTUAL_CLUSTER_ID = Variable.get("emr_virtual_cluster_id")
JOB_ROLE_ARN = Variable.get("emr_execution_role_arn")
CONFIGURATION_OVERRIDES_ARG = {
"applicationConfiguration": [
{
"classification": "spark-defaults",
"properties": {
"spark.kubernetes.container.image": "image-name",
"spark.sql.catalogImplementation": "hive",
"spark.hadoop.hive.metastore.client.factory.class": "com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory",
"spark.sql.shuffle.partitions": "80",
"spark.default.parallelism": "80",
"spark.driver.cores": "3",
"spark.driver.memory": "16g",
"spark.executor.cores": "4",
"spark.executor.memory": "16g",
"spark.dynamicAllocation.enabled": "true",
"spark.sql.sources.partitionOverwriteMode": "dynamic",
"spark.dynamicAllocation.shuffleTracking.enabled": "true",
"spark.dynamicAllocation.initialExecutors": "1",
"spark.dynamicAllocation.minExecutors": "1",
"spark.dynamicAllocation.maxExecutors": "20",
"spark.memory.offHeap.enabled": "true",
"spark.memory.offHeap.size": "2g",
"spark.executor.memoryOverhead": "8g",
"spark.driver.memoryOverhead": "8g"
},
}
],
"monitoringConfiguration": {
"cloudWatchMonitoringConfiguration": {
"logGroupName": "/aws/caspian-emr-eks-spark",
"logStreamNamePrefix": "airflow",
}
},
}
job_driver = {
"sparkSqlJobDriver": {
"entryPoint": "local:///sql_test_repo/test_sparkSQL_job.sql"
}
}
config_overrides = json.dumps(CONFIGURATION_OVERRIDES_ARG)
job_driver_string = json.dumps(job_driver)
with DAG('sparkSQLtest',
start_date=datetime(2021, 9, 19),
schedule_interval=@once,
catchup=False
) as dag:
create_schema = BashOperator(
task_id = 'sparkSQL_test',
bash_command = f'aws emr-containers start-job-run \
--virtual-cluster-id {VIRTUAL_CLUSTER_ID} \
--name sparksql-create-table \
--execution-role-arn {JOB_ROLE_ARN}\
--release-label emr-6.4.0-latest \
--job-driver \'{job_driver_string}\'\
--configuration-overrides {config_overrides}',
dag=dag,
)