Apache Airflow-Spark提交失败-当使用主'yarn-client'运行时,必须在环境中设置HADOOP_CONF_DIR或YARN_CONF_DIR

问题描述 投票:0回答:1

我不熟悉Spark和Airflow,并尝试创建一个在pyspark中运行spark提交作业的DAG。

在我的Ubuntu系统中,我创建了一个名为'hadoopusr'的用户,通过该用户手动运行spark提交。所有环境变量均在此用户下的/.bashrc中设置。

当我从终端手动运行spark-submit时,作业正在成功运行。

我创建了一个示例DAG文件,看起来像这样:

"""
Apache Airflow DAG Script: Ingestion MVP basic version
"""
#imports
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
import os
import sys
#Paths
os.environ['SPARK_HOME'] = '/usr/local/spark/spark-2.4.4-bin-hadoop2.7'
sys.path.append(os.path.join(os.environ['SPARK_HOME'], 'bin'))
#Parameters
filename="CDC_DAY1"
file_extension="csv"
batch_id="20191203"
## Define the DAG object
default_args = {
    'owner': 'hadoopusr',
    'depends_on_past': False,
    'start_date': datetime(2019, 12, 3),
    'retries': 5,
    'retry_delay': timedelta(minutes=1),
}
dag = DAG('MVP_BASIC_2', default_args=default_args, schedule_interval=timedelta(1))
#List of tasks
LandingToRaw = BashOperator(
    task_id='Landing_to_Raw',
    bash_command="spark-submit --master yarn-client /home/soham/Documents/mvp_landing_raw_1.py "+filename+" "+file_extension+" "+batch_id,
    dag=dag,
    run_as_user='hadoopusr')

RawToStaging = BashOperator(
    task_id='Raw_to_Staging',
    bash_command="spark-submit --master yarn-client /home/soham/Documents/mvp_raw_staging_1.py "+filename+" "+file_extension+" "+batch_id+" FULL emp_id",
    dag=dag,
    run_as_user='hadoopusr')

#Dependencies
LandingToRaw >> RawToStaging

[当我在同一用户(hadoopusr)下使用以下命令测试DAG的第一个任务时,它会抛出异常,例如:

Running command: spark-submit --master yarn-client /home/soham/Documents/mvp_landing_raw_1.py CDC_DAY1 csv 20191203
[2019-12-04 19:27:52,527] {bash_operator.py:124} INFO - Output:
[2019-12-04 19:27:54,937] {bash_operator.py:128} INFO - Exception in thread "main" org.apache.spark.SparkException: When running with master 'yarn-client' either HADOOP_CONF_DIR or YARN_CONF_DIR must be set in the environment.
[2019-12-04 19:27:54,938] {bash_operator.py:128} INFO -     at org.apache.spark.deploy.SparkSubmitArguments.error(SparkSubmitArguments.scala:657)
[2019-12-04 19:27:54,938] {bash_operator.py:128} INFO -     at org.apache.spark.deploy.SparkSubmitArguments.validateSubmitArguments(SparkSubmitArguments.scala:290)
[2019-12-04 19:27:54,938] {bash_operator.py:128} INFO -     at org.apache.spark.deploy.SparkSubmitArguments.validateArguments(SparkSubmitArguments.scala:251)
[2019-12-04 19:27:54,938] {bash_operator.py:128} INFO -     at org.apache.spark.deploy.SparkSubmitArguments.<init>(SparkSubmitArguments.scala:120)
[2019-12-04 19:27:54,939] {bash_operator.py:128} INFO -     at org.apache.spark.deploy.SparkSubmit$$anon$2$$anon$1.<init>(SparkSubmit.scala:907)
[2019-12-04 19:27:54,939] {bash_operator.py:128} INFO -     at org.apache.spark.deploy.SparkSubmit$$anon$2.parseArguments(SparkSubmit.scala:907)
[2019-12-04 19:27:54,939] {bash_operator.py:128} INFO -     at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:81)
[2019-12-04 19:27:54,939] {bash_operator.py:128} INFO -     at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:920)
[2019-12-04 19:27:54,940] {bash_operator.py:128} INFO -     at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:929)
[2019-12-04 19:27:54,940] {bash_operator.py:128} INFO -     at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
[2019-12-04 19:27:54,973] {bash_operator.py:132} INFO - Command exited with return code 1
[2019-12-04 19:27:54,987] {taskinstance.py:1058} ERROR - Bash command failed
Traceback (most recent call last):
  File "/usr/local/lib/python3.6/dist-packages/airflow/models/taskinstance.py", line 930, in _run_raw_task
    result = task_copy.execute(context=context)
  File "/usr/local/lib/python3.6/dist-packages/airflow/operators/bash_operator.py", line 136, in execute
    raise AirflowException("Bash command failed")
airflow.exceptions.AirflowException: Bash command failed
[2019-12-04 19:27:54,990] {taskinstance.py:1081} INFO - Marking task as UP_FOR_RETRY
Traceback (most recent call last):
  File "/usr/local/bin/airflow", line 37, in <module>
    args.func(args)
  File "/usr/local/lib/python3.6/dist-packages/airflow/utils/cli.py", line 74, in wrapper
    return f(*args, **kwargs)
  File "/usr/local/lib/python3.6/dist-packages/airflow/bin/cli.py", line 688, in test
    ti.run(ignore_task_deps=True, ignore_ti_state=True, test_mode=True)
  File "/usr/local/lib/python3.6/dist-packages/airflow/utils/db.py", line 74, in wrapper
    return func(*args, **kwargs)
  File "/usr/local/lib/python3.6/dist-packages/airflow/models/taskinstance.py", line 1020, in run
    session=session)
  File "/usr/local/lib/python3.6/dist-packages/airflow/utils/db.py", line 70, in wrapper
    return func(*args, **kwargs)
  File "/usr/local/lib/python3.6/dist-packages/airflow/models/taskinstance.py", line 930, in _run_raw_task
    result = task_copy.execute(context=context)
  File "/usr/local/lib/python3.6/dist-packages/airflow/operators/bash_operator.py", line 136, in execute
    raise AirflowException("Bash command failed")
airflow.exceptions.AirflowException: Bash command failed

我在配置中的某些地方出错了吗?

apache-spark pyspark airflow directed-acyclic-graphs spark-submit
1个回答
0
投票

您正在尝试在yarn-client模式下运行spark,因为您需要配置YARN,如果您使用的是YARN,则需要将YARN_CONF_DIR设置为纱线配置的路径,您也可以使用spark的本地主模式运行--master local [num_of_threads / *]。

© www.soinside.com 2019 - 2024. All rights reserved.