Airflow 未获取 Spark 作业的 FAILED 状态

问题描述 投票:0回答:2

我使用此 Helm 图表在 Kubernetes 上运行 Airflow:https://github.com/apache/airflow/tree/1.5.0

我编写了一个非常简单的 DAG 只是为了测试一些东西。看起来像这样:

default_args={
    'depends_on_past': False,
    'email': ['[email protected]'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5)
}

with DAG(
    'my-dag',
    default_args=default_args,
    description='simple dag',
    schedule_interval=timedelta(days=1),
    start_date=datetime(2022, 4, 21),
    catchup=False,
    tags=['example']
) as dag:

    t1 = SparkKubernetesOperator(
        task_id='spark-pi',
        trigger_rule="all_success",
        depends_on_past=False,
        retries=3,
        application_file="spark-pi.yaml",
        namespace="my-ns",
        kubernetes_conn_id="myk8s",
        api_group="sparkoperator.k8s.io",
        api_version="v1beta2",
        do_xcom_push=True,
        dag=dag
    )

    t2 = SparkKubernetesOperator(
        task_id='other-spark-job',
        trigger_rule="all_success",
        depends_on_past=False,
        retries=3,
        application_file=other-spark-job-definition,
        namespace="my-ns",
        kubernetes_conn_id="myk8s",
        api_group="sparkoperator.k8s.io",
        api_version="v1beta2",
        dag=dag
    )

    t1 >> t2

当我从 Airflow UI 运行 DAG 时,第一个任务 Spark 作业(

t1
,spark-pi)被创建并立即标记为成功,然后 Airflow 立即启动第二个 (t2) 任务。这可以在 Web UI 中看到:

您看到的是 5 个独立 DAG 运行中两个任务的状态,以及它们的总状态(圆圈)。图片中间一行显示的是

t1
的状态,即
"success"

但是,Spark算子启动的

t1
实际的spark-pi pod每次运行都会失败,可以通过查询Kubernetes上的Sparkapplication资源来查看其状态:

$ kubectl get sparkapplications/spark-pi-2022-04-28-2 -n my-ns -o json
{
    "apiVersion": "sparkoperator.k8s.io/v1beta2",
    "kind": "SparkApplication",
    "metadata": {
        "creationTimestamp": "2022-04-29T13:28:02Z",
        "generation": 1,
        "name": "spark-pi-2022-04-28-2",
        "namespace": "my-ns",
        "resourceVersion": "111463226",
        "uid": "23f1c8fb-7843-4628-b22f-7808b562f9d8"
    },
    "spec": {
        "driver": {
            "coreLimit": "1500m",
            "cores": 1,
            "labels": {
                "version": "2.4.4"
            },
            "memory": "512m",
            "volumeMounts": [
                {
                    "mountPath": "/tmp",
                    "name": "test-volume"
                }
            ]
        },
        "executor": {
            "coreLimit": "1500m",
            "cores": 1,
            "instances": 1,
            "labels": {
                "version": "2.4.4"
            },
            "memory": "512m",
            "volumeMounts": [
                {
                    "mountPath": "/tmp",
                    "name": "test-volume"
                }
            ]
        },
        "image": "my.google.artifactory.com/spark-operator/spark:v2.4.4",
        "imagePullPolicy": "Always",
        "mainApplicationFile": "local:///opt/spark/examples/jars/spark-examples_2.11-2.4.4.jar",
        "mainClass": "org.apache.spark.examples.SparkPi",
        "mode": "cluster",
        "restartPolicy": {
            "type": "Never"
        },
        "sparkVersion": "2.4.4",
        "type": "Scala",
        "volumes": [
            {
                "hostPath": {
                    "path": "/tmp",
                    "type": "Directory"
                },
                "name": "test-volume"
            }
        ]
    },
    "status": {
        "applicationState": {
            "errorMessage": "driver container failed with ExitCode: 1, Reason: Error",
            "state": "FAILED"
        },
        "driverInfo": {
            "podName": "spark-pi-2022-04-28-2-driver",
            "webUIAddress": "172.20.23.178:4040",
            "webUIPort": 4040,
            "webUIServiceName": "spark-pi-2022-04-28-2-ui-svc"
        },
        "executionAttempts": 1,
        "lastSubmissionAttemptTime": "2022-04-29T13:28:15Z",
        "sparkApplicationId": "spark-3335e141a51148d7af485457212eb389",
        "submissionAttempts": 1,
        "submissionID": "021e78fc-4754-4ac8-a87d-52c682ddc483",
        "terminationTime": "2022-04-29T13:28:25Z"
    }
}

正如您在

status
部分中看到的,我们有
"state": "FAILED"
。尽管如此,Airflow 将其标记为成功,因此会在其之后立即运行 t2,这在将
t2
定义为依赖于
t1
(下游)时并不是我们想要的。

为什么即使 Spark 作业本身失败,Airflow 仍将

t1
视为成功?

apache-spark kubernetes airflow kubernetes-helm
2个回答
1
投票

这就是实现。如果您看到操作员的代码,它基本上是一个提交并忘记的工作。要监控状态,请使用 SparkkubernetesSensor

       t2 = SparkKubernetesSensor(
            task_id="spark_monitor",
            application_name="{{ task_instance.xcom_pull(task_ids='spark-job-full-refresh.spark_full_refresh') ['metadata']['name'] }}",
            attach_log=True,
        )

我尝试创建一个结合两者的自定义运算符,但它通过继承不能很好地工作,因为它们的执行模式略有不同,因此需要从头开始创建。但无论出于何种目的和意图,传感器都可以完美工作,只是在代码中添加了不需要的行。


0
投票

我正在使用sparkKubernetesOperator在k8s中使用airflow2.6.0部署sparkapp。在这种情况下,即使 SparkApp 已创建但由于任何原因失败,该任务也会被标记为成功。

我尝试使用 SparkKubernetesSensor 来跟踪 Sparkjob 状态。但传感器任务失败并出现错误

  File "/opt/bitnami/airflow/venv/lib/python3.9/site-packages/jinja2/runtime.py", line 852, in _fail_with_undefined_error
    raise self._undefined_exception(self._undefined_message)
jinja2.exceptions.UndefinedError: 'None' has no attribute 'metadata'
[2024-03-13T08:17:39.510+0000] {taskinstance.py:1368} INFO - Marking task

勾选后,sparkkubernetesoperator任务的xcom为空

我还检查了仅使用 bash 运算符来回显 hello 的任务,在这种情况下 xcom 返回值

这就是达格

from airflow import DAG
from datetime import timedelta, datetime
from airflow.providers.cncf.kubernetes.operators.spark_kubernetes import SparkKubernetesOperator
from airflow.providers.cncf.kubernetes.sensors.spark_kubernetes import SparkKubernetesSensor
import os

default_args={
   'depends_on_past': False,
}

with DAG(
   'spark-dag',
   default_args=default_args,
   description='sample spark dag',
   schedule_interval=timedelta(days=1),
   start_date=datetime(2022, 11, 17),
   catchup=False,
   tags=['maps']
) as dag:
   t1 = SparkKubernetesOperator(
       task_id='sample-spark',
       application_file="spark/sample-spark/sample-spark.yaml",
       namespace="incentives",
       do_xcom_push=True,
       dag=dag
   )

   t2 = SparkKubernetesSensor(
      task_id='spark-sensor',
      namespace="incentives",
      application_name="{{ task_instance.xcom_pull(task_ids='sample-spark')['metadata']['name'] }}",
      attach_log=True,
      poke_interval=5,
      dag=dag
      )
   t1 >> t2
© www.soinside.com 2019 - 2024. All rights reserved.