Spark 作业在 Airflow 中成功,但在 Spark UI 中看不到结果

问题描述 投票:0回答:1

我是 Airflow 和 Spark 的初学者,目前正在使用 Airflow 和 Spark 在本地设置数据管道。 我想做的 DAG 只有一项在 Spark 上运行 pyspark 作业的任务。

我的应用程序的 dags 文件夹包含两个文件:

dag-airflow-spark-submitop.py:

import airflow
from datetime import datetime, timedelta
from airflow.contrib.operators.spark_submit_operator import SparkSubmitOperator

default_args = {
    'owner': 'moi',
    'depends_on_past': True,
    'retries': 5,
    'retry_delay': timedelta(minutes=30),
    'start_date': datetime(year=2023, month=7, day=9),
}

with airflow.DAG('dag_teste_spark_connection', default_args=default_args, schedule_interval='0 1 * * *') as dag:
    task_elt_documento_pagar = SparkSubmitOperator(
        task_id='task_run_pysparkjob',
        conn_id='spark',
        application="./dags/sparkjob.py",
    )

sparkjob.py:

from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("test") \
    .getOrCreate()

data = [("Java", "20000"), ("Python", "100000"), ("Scala", "3000")]
df = spark.createDataFrame(data)
df.show()

所以第一步是在 localhost:8080 中运行我的 Spark 服务(主服务和工作服务),如下所示:

然后是气流:

在 UI 中我们可以看到我的 DAG“dag_teste_spark_connection”。

这是我的 Spark 连接:

问题是,当我运行 DAG 时,我在 UI 中取得了成功,但我看不到 Spark UI 中完成的工作:

正如您在此屏幕截图中看到的,我没有已完成的申请:

以下是我单击切换按钮后任务的详细信息:

这里是任务日志:

*** Found local files:
***   * /opt/airflow/logs/dag_id=dag_teste_spark_connection/run_id=manual__2023-07-09T14:53:27.424066+00:00/task_id=task_run_pysparkjob/attempt=1.log
[2023-07-09, 14:53:37 UTC] {taskinstance.py:1103} INFO - Dependencies all met for dep_context=non-requeueable deps ti=<TaskInstance: dag_teste_spark_connection.task_run_pysparkjob manual__2023-07-09T14:53:27.424066+00:00 [queued]>
[2023-07-09, 14:53:37 UTC] {taskinstance.py:1103} INFO - Dependencies all met for dep_context=requeueable deps ti=<TaskInstance: dag_teste_spark_connection.task_run_pysparkjob manual__2023-07-09T14:53:27.424066+00:00 [queued]>
[2023-07-09, 14:53:37 UTC] {taskinstance.py:1308} INFO - Starting attempt 1 of 6
[2023-07-09, 14:53:37 UTC] {taskinstance.py:1327} INFO - Executing <Task(SparkSubmitOperator): task_run_pysparkjob> on 2023-07-09 14:53:27.424066+00:00
[2023-07-09, 14:53:37 UTC] {standard_task_runner.py:57} INFO - Started process 278 to run task
[2023-07-09, 14:53:37 UTC] {standard_task_runner.py:84} INFO - Running: ['***', 'tasks', 'run', 'dag_teste_spark_connection', 'task_run_pysparkjob', 'manual__2023-07-09T14:53:27.424066+00:00', '--job-id', '98', '--raw', '--subdir', 'DAGS_FOLDER/dag-***-spark-submitop.py', '--cfg-path', '/tmp/tmpuzlf2_yf']
[2023-07-09, 14:53:37 UTC] {standard_task_runner.py:85} INFO - Job 98: Subtask task_run_pysparkjob
[2023-07-09, 14:53:37 UTC] {task_command.py:410} INFO - Running <TaskInstance: dag_teste_spark_connection.task_run_pysparkjob manual__2023-07-09T14:53:27.424066+00:00 [running]> on host ef29c1050b71
[2023-07-09, 14:53:37 UTC] {taskinstance.py:1547} INFO - Exporting env vars: AIRFLOW_CTX_DAG_OWNER='moi' AIRFLOW_CTX_DAG_ID='dag_teste_spark_connection' AIRFLOW_CTX_TASK_ID='task_run_pysparkjob' AIRFLOW_CTX_EXECUTION_DATE='2023-07-09T14:53:27.424066+00:00' AIRFLOW_CTX_TRY_NUMBER='1' AIRFLOW_CTX_DAG_RUN_ID='manual__2023-07-09T14:53:27.424066+00:00'
[2023-07-09, 14:53:37 UTC] {base.py:73} INFO - Using connection ID 'spark' for task execution.
[2023-07-09, 14:53:37 UTC] {spark_submit.py:339} INFO - Spark-Submit cmd: spark-submit --master spark://MacBook-Pro-de-AdminSensei-9.local:7077:7077 --name arrow-spark ./dags/sparkjob.py
[2023-07-09, 14:53:38 UTC] {spark_submit.py:490} INFO - /home/***/.local/lib/python3.7/site-packages/pyspark/bin/load-spark-env.sh: line 68: ps: command not found
[2023-07-09, 14:53:38 UTC] {spark_submit.py:490} INFO - JAVA_HOME is not set
[2023-07-09, 14:53:38 UTC] {taskinstance.py:1824} ERROR - Task failed with exception
Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.7/site-packages/airflow/providers/apache/spark/operators/spark_submit.py", line 157, in execute
    self._hook.submit(self._application)
  File "/home/airflow/.local/lib/python3.7/site-packages/airflow/providers/apache/spark/hooks/spark_submit.py", line 422, in submit
    f"Cannot execute: {self._mask_cmd(spark_submit_cmd)}. Error code is: {returncode}."
airflow.exceptions.AirflowException: Cannot execute: spark-submit --master spark://MacBook-Pro-de-AdminSensei-9.local:7077:7077 --name arrow-spark ./dags/sparkjob.py. Error code is: 1.
[2023-07-09, 14:53:38 UTC] {taskinstance.py:1350} INFO - Marking task as UP_FOR_RETRY. dag_id=dag_teste_spark_connection, task_id=task_run_pysparkjob, execution_date=20230709T145327, start_date=20230709T145337, end_date=20230709T145338
[2023-07-09, 14:53:38 UTC] {standard_task_runner.py:109} ERROR - Failed to execute job 98 for task task_run_pysparkjob (Cannot execute: spark-submit --master spark://MacBook-Pro-de-AdminSensei-9.local:7077:7077 --name arrow-spark ./dags/sparkjob.py. Error code is: 1.; 278)
[2023-07-09, 14:53:38 UTC] {local_task_job_runner.py:225} INFO - Task exited with return code 1
[2023-07-09, 14:53:38 UTC] {taskinstance.py:2651} INFO - 0 downstream tasks scheduled from follow-on schedule check

你能帮我吗?

pyspark airflow spark-submit
1个回答
1
投票

默认情况下,新的 dags 是暂停的。要激活它,请按 Dags 页面中您的 dag 名称旁边的切换按钮。

在您附加的屏幕截图中,您可以看到所有 dags 都已暂停,并且任务未处于成功模式(不是绿色)

© www.soinside.com 2019 - 2024. All rights reserved.