我不熟悉Spark和Airflow,并尝试创建一个在pyspark中运行spark提交作业的DAG。
在我的Ubuntu系统中,我创建了一个名为'hadoopusr'的用户,通过该用户手动运行spark提交。所有环境变量均在此用户下的/.bashrc
中设置。
当我从终端手动运行spark-submit时,作业正在成功运行。
我创建了一个示例DAG文件,看起来像这样:
"""
Apache Airflow DAG Script: Ingestion MVP basic version
"""
#imports
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
import os
import sys
#Paths
os.environ['SPARK_HOME'] = '/usr/local/spark/spark-2.4.4-bin-hadoop2.7'
sys.path.append(os.path.join(os.environ['SPARK_HOME'], 'bin'))
#Parameters
filename="CDC_DAY1"
file_extension="csv"
batch_id="20191203"
## Define the DAG object
default_args = {
'owner': 'hadoopusr',
'depends_on_past': False,
'start_date': datetime(2019, 12, 3),
'retries': 5,
'retry_delay': timedelta(minutes=1),
}
dag = DAG('MVP_BASIC_2', default_args=default_args, schedule_interval=timedelta(1))
#List of tasks
LandingToRaw = BashOperator(
task_id='Landing_to_Raw',
bash_command="spark-submit --master yarn-client /home/soham/Documents/mvp_landing_raw_1.py "+filename+" "+file_extension+" "+batch_id,
dag=dag,
run_as_user='hadoopusr')
RawToStaging = BashOperator(
task_id='Raw_to_Staging',
bash_command="spark-submit --master yarn-client /home/soham/Documents/mvp_raw_staging_1.py "+filename+" "+file_extension+" "+batch_id+" FULL emp_id",
dag=dag,
run_as_user='hadoopusr')
#Dependencies
LandingToRaw >> RawToStaging
[当我在同一用户(hadoopusr)下使用以下命令测试DAG的第一个任务时,它会抛出异常,例如:
Running command: spark-submit --master yarn-client /home/soham/Documents/mvp_landing_raw_1.py CDC_DAY1 csv 20191203
[2019-12-04 19:27:52,527] {bash_operator.py:124} INFO - Output:
[2019-12-04 19:27:54,937] {bash_operator.py:128} INFO - Exception in thread "main" org.apache.spark.SparkException: When running with master 'yarn-client' either HADOOP_CONF_DIR or YARN_CONF_DIR must be set in the environment.
[2019-12-04 19:27:54,938] {bash_operator.py:128} INFO - at org.apache.spark.deploy.SparkSubmitArguments.error(SparkSubmitArguments.scala:657)
[2019-12-04 19:27:54,938] {bash_operator.py:128} INFO - at org.apache.spark.deploy.SparkSubmitArguments.validateSubmitArguments(SparkSubmitArguments.scala:290)
[2019-12-04 19:27:54,938] {bash_operator.py:128} INFO - at org.apache.spark.deploy.SparkSubmitArguments.validateArguments(SparkSubmitArguments.scala:251)
[2019-12-04 19:27:54,938] {bash_operator.py:128} INFO - at org.apache.spark.deploy.SparkSubmitArguments.<init>(SparkSubmitArguments.scala:120)
[2019-12-04 19:27:54,939] {bash_operator.py:128} INFO - at org.apache.spark.deploy.SparkSubmit$$anon$2$$anon$1.<init>(SparkSubmit.scala:907)
[2019-12-04 19:27:54,939] {bash_operator.py:128} INFO - at org.apache.spark.deploy.SparkSubmit$$anon$2.parseArguments(SparkSubmit.scala:907)
[2019-12-04 19:27:54,939] {bash_operator.py:128} INFO - at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:81)
[2019-12-04 19:27:54,939] {bash_operator.py:128} INFO - at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:920)
[2019-12-04 19:27:54,940] {bash_operator.py:128} INFO - at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:929)
[2019-12-04 19:27:54,940] {bash_operator.py:128} INFO - at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
[2019-12-04 19:27:54,973] {bash_operator.py:132} INFO - Command exited with return code 1
[2019-12-04 19:27:54,987] {taskinstance.py:1058} ERROR - Bash command failed
Traceback (most recent call last):
File "/usr/local/lib/python3.6/dist-packages/airflow/models/taskinstance.py", line 930, in _run_raw_task
result = task_copy.execute(context=context)
File "/usr/local/lib/python3.6/dist-packages/airflow/operators/bash_operator.py", line 136, in execute
raise AirflowException("Bash command failed")
airflow.exceptions.AirflowException: Bash command failed
[2019-12-04 19:27:54,990] {taskinstance.py:1081} INFO - Marking task as UP_FOR_RETRY
Traceback (most recent call last):
File "/usr/local/bin/airflow", line 37, in <module>
args.func(args)
File "/usr/local/lib/python3.6/dist-packages/airflow/utils/cli.py", line 74, in wrapper
return f(*args, **kwargs)
File "/usr/local/lib/python3.6/dist-packages/airflow/bin/cli.py", line 688, in test
ti.run(ignore_task_deps=True, ignore_ti_state=True, test_mode=True)
File "/usr/local/lib/python3.6/dist-packages/airflow/utils/db.py", line 74, in wrapper
return func(*args, **kwargs)
File "/usr/local/lib/python3.6/dist-packages/airflow/models/taskinstance.py", line 1020, in run
session=session)
File "/usr/local/lib/python3.6/dist-packages/airflow/utils/db.py", line 70, in wrapper
return func(*args, **kwargs)
File "/usr/local/lib/python3.6/dist-packages/airflow/models/taskinstance.py", line 930, in _run_raw_task
result = task_copy.execute(context=context)
File "/usr/local/lib/python3.6/dist-packages/airflow/operators/bash_operator.py", line 136, in execute
raise AirflowException("Bash command failed")
airflow.exceptions.AirflowException: Bash command failed
我在配置中的某些地方出错了吗?
您正在尝试在yarn-client模式下运行spark,因为您需要配置YARN,如果您使用的是YARN,则需要将YARN_CONF_DIR设置为纱线配置的路径,您也可以使用spark的本地主模式运行--master local [num_of_threads / *]。