从气流作业执行SparkSubmitOperator时出错

问题描述 投票:1回答:1

背景:我创建了一个新的Airflow Job / Task DAG,在其中使用了SparkSubmitOperator。我在我的桌面(以下版本等)上同时运行Spark和Airflow。 DAG可以正常工作,直到到达Spark作业的Submit部分。我尝试使用以下选项更改连接。无论我尝试什么,我都会在气流日志中收到以下消息。

气流识别出连接并尝试使用它,但失败。

如果我从命令提示符下提交目标DataPipelineExample.py,则它可以正常运行。

问题:是什么阻止了气流识别并使用该连接来触发本地火花以执行火花提交?] >>

Airflow.exceptions.AirflowException:无法执行:spark-submit --masterhttp://localhost:4040--name mySparkSubmitJob

桌面:Linux Mint VERSION =“ 19.3(Tricia)”Spark:版本2.4.5Pyspark:版本2.4.5气流:版本:1.10.9Python 3.7.4(默认,2019年8月13日,20:35:49)java版本“ 1.8.0_241”

已使用或尝试过的气流连接本地主机4040spark://本地主机4040http://localhost:4040http://specific IP地址:4040主机:localhost端口:4040 / Extras,No Extras等其他:{“ root.default” ,:“ spark_home”:“”,“ spark_binary”:“ spark-submit”,“命名空间”:“ default”}

路径信息

export SCALA_HOME=~/anaconda3/share/scala-2.11.1
export SPARK_HOME=/usr/local/spark
export PYSPARK_SUBMIT_ARGS="--master local[2] pyspark-shell"
export PATH=$PATH:/usr/local/spark/bin

在完整的DAG下面。可以编译并完全被Python和Airflow识别。

from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.contrib.operators.spark_submit_operator import SparkSubmitOperator
from airflow.models import Variable
from datetime import datetime, timedelta

default_args = {
    'owner': '[email protected]',
    'depends_on_past': False,
    'start_date': datetime(2020, 3, 17),
    'email': ['[email protected]'],
    'email_on_failure': True,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
    # 'queue': 'bash_queue',
    # 'pool': 'backfill',
    # 'priority_weight': 10,
    'end_date': datetime(2030, 3, 17),
}

dag = DAG(dag_id = 'a_data_pipelne_job', default_args=default_args, schedule_interval='*/45 * * * *')

t1 = BashOperator(
    task_id='print_date',
    bash_command='date',
    dag=dag)

print_path_env_task = BashOperator(
    task_id='print_path_env',
    bash_command='echo $PATH',
    dag=dag)

spark_submit_task = SparkSubmitOperator(
    task_id='spark_submit_job_02',
    conn_id='spark_local',
    application = "/home/me/.config/spyder-py3/DataPipelineExample.py",
    name='airflowspark-DataLoaderMongo',
    verbose=True,
    dag=dag,
)

t1.set_upstream(print_path_env_task)
spark_submit_task.set_upstream(t1)

背景:我创建了一个新的Airflow Job / Task DAG,在其中使用了SparkSubmitOperator。我在我的桌面(以下版本等)上同时运行Spark和Airflow。 DAG可以正常工作,直到...

linux apache-spark airflow pyspark-sql
1个回答
0
投票

通过使用SSHOperator,我能够解决此问题。与SparkSubmitOperator相比,它更不容易受到环境配置问题的影响。在本地pyspark主页的上下文中,通过SSH调用SparkSubmit。为您的python脚本添加path参数,就可以了。

dag = DAG(dag_id = 'a_pjm_data_pipelne__ssh_job', 
                  default_args=default_args, 
                  schedule_interval='*/60 * * * *',
                  params={'project_source': '/home/me/.config/spyder-py3',
                  'spark_submit': '/usr/local/spark/bin/spark-submit DataPipelineExample.py'})

templated_bash_command = """
    echo 'HOSTNAME: localhost' #To check that you are properly connected to the host
    cd {{ params.project_source }}
    {{ params.spark_submit }}
"""

t1 = BashOperator(
    task_id='print_date',
    bash_command='date',
    dag=dag)

print_path_env_task = BashOperator(
    task_id='print_path_env',
    bash_command='echo $PATH',
    dag=dag)

submit_spark_task = SSHOperator(
    task_id="SSH_task",
    ssh_conn_id='ssh_default',
    command=templated_bash_command,
    dag=dag
© www.soinside.com 2019 - 2024. All rights reserved.