我使用此 Helm 图表在 Kubernetes 上运行 Airflow:https://github.com/apache/airflow/tree/1.5.0
我编写了一个非常简单的 DAG 只是为了测试一些东西。看起来像这样:
default_args={
'depends_on_past': False,
'email': ['[email protected]'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5)
}
with DAG(
'my-dag',
default_args=default_args,
description='simple dag',
schedule_interval=timedelta(days=1),
start_date=datetime(2022, 4, 21),
catchup=False,
tags=['example']
) as dag:
t1 = SparkKubernetesOperator(
task_id='spark-pi',
trigger_rule="all_success",
depends_on_past=False,
retries=3,
application_file="spark-pi.yaml",
namespace="my-ns",
kubernetes_conn_id="myk8s",
api_group="sparkoperator.k8s.io",
api_version="v1beta2",
do_xcom_push=True,
dag=dag
)
t2 = SparkKubernetesOperator(
task_id='other-spark-job',
trigger_rule="all_success",
depends_on_past=False,
retries=3,
application_file=other-spark-job-definition,
namespace="my-ns",
kubernetes_conn_id="myk8s",
api_group="sparkoperator.k8s.io",
api_version="v1beta2",
dag=dag
)
t1 >> t2
当我从 Airflow UI 运行 DAG 时,第一个任务 Spark 作业(
t1
,spark-pi)被创建并立即标记为成功,然后 Airflow 立即启动第二个 (t2) 任务。这可以在 Web UI 中看到:
您看到的是 5 个独立 DAG 运行中两个任务的状态,以及它们的总状态(圆圈)。图片中间一行显示的是
t1
的状态,即"success"
。
但是,Spark算子启动的
t1
实际的spark-pi pod每次运行都会失败,可以通过查询Kubernetes上的Sparkapplication资源来查看其状态:
$ kubectl get sparkapplications/spark-pi-2022-04-28-2 -n my-ns -o json
{
"apiVersion": "sparkoperator.k8s.io/v1beta2",
"kind": "SparkApplication",
"metadata": {
"creationTimestamp": "2022-04-29T13:28:02Z",
"generation": 1,
"name": "spark-pi-2022-04-28-2",
"namespace": "my-ns",
"resourceVersion": "111463226",
"uid": "23f1c8fb-7843-4628-b22f-7808b562f9d8"
},
"spec": {
"driver": {
"coreLimit": "1500m",
"cores": 1,
"labels": {
"version": "2.4.4"
},
"memory": "512m",
"volumeMounts": [
{
"mountPath": "/tmp",
"name": "test-volume"
}
]
},
"executor": {
"coreLimit": "1500m",
"cores": 1,
"instances": 1,
"labels": {
"version": "2.4.4"
},
"memory": "512m",
"volumeMounts": [
{
"mountPath": "/tmp",
"name": "test-volume"
}
]
},
"image": "my.google.artifactory.com/spark-operator/spark:v2.4.4",
"imagePullPolicy": "Always",
"mainApplicationFile": "local:///opt/spark/examples/jars/spark-examples_2.11-2.4.4.jar",
"mainClass": "org.apache.spark.examples.SparkPi",
"mode": "cluster",
"restartPolicy": {
"type": "Never"
},
"sparkVersion": "2.4.4",
"type": "Scala",
"volumes": [
{
"hostPath": {
"path": "/tmp",
"type": "Directory"
},
"name": "test-volume"
}
]
},
"status": {
"applicationState": {
"errorMessage": "driver container failed with ExitCode: 1, Reason: Error",
"state": "FAILED"
},
"driverInfo": {
"podName": "spark-pi-2022-04-28-2-driver",
"webUIAddress": "172.20.23.178:4040",
"webUIPort": 4040,
"webUIServiceName": "spark-pi-2022-04-28-2-ui-svc"
},
"executionAttempts": 1,
"lastSubmissionAttemptTime": "2022-04-29T13:28:15Z",
"sparkApplicationId": "spark-3335e141a51148d7af485457212eb389",
"submissionAttempts": 1,
"submissionID": "021e78fc-4754-4ac8-a87d-52c682ddc483",
"terminationTime": "2022-04-29T13:28:25Z"
}
}
正如您在
status
部分中看到的,我们有 "state": "FAILED"
。尽管如此,Airflow 将其标记为成功,因此会在其之后立即运行 t2,这在将 t2
定义为依赖于 t1
(下游)时并不是我们想要的。
为什么即使 Spark 作业本身失败,Airflow 仍将
t1
视为成功?
这就是实现。如果您看到操作员的代码,它基本上是一个提交并忘记的工作。要监控状态,请使用 SparkkubernetesSensor
t2 = SparkKubernetesSensor(
task_id="spark_monitor",
application_name="{{ task_instance.xcom_pull(task_ids='spark-job-full-refresh.spark_full_refresh') ['metadata']['name'] }}",
attach_log=True,
)
我尝试创建一个结合两者的自定义运算符,但它通过继承不能很好地工作,因为它们的执行模式略有不同,因此需要从头开始创建。但无论出于何种目的和意图,传感器都可以完美工作,只是在代码中添加了不需要的行。
我正在使用sparkKubernetesOperator在k8s中使用airflow2.6.0部署sparkapp。在这种情况下,即使 SparkApp 已创建但由于任何原因失败,该任务也会被标记为成功。
我尝试使用 SparkKubernetesSensor 来跟踪 Sparkjob 状态。但传感器任务失败并出现错误
File "/opt/bitnami/airflow/venv/lib/python3.9/site-packages/jinja2/runtime.py", line 852, in _fail_with_undefined_error
raise self._undefined_exception(self._undefined_message)
jinja2.exceptions.UndefinedError: 'None' has no attribute 'metadata'
[2024-03-13T08:17:39.510+0000] {taskinstance.py:1368} INFO - Marking task
勾选后,sparkkubernetesoperator任务的xcom为空
我还检查了仅使用 bash 运算符来回显 hello 的任务,在这种情况下 xcom 返回值
这就是达格
from airflow import DAG
from datetime import timedelta, datetime
from airflow.providers.cncf.kubernetes.operators.spark_kubernetes import SparkKubernetesOperator
from airflow.providers.cncf.kubernetes.sensors.spark_kubernetes import SparkKubernetesSensor
import os
default_args={
'depends_on_past': False,
}
with DAG(
'spark-dag',
default_args=default_args,
description='sample spark dag',
schedule_interval=timedelta(days=1),
start_date=datetime(2022, 11, 17),
catchup=False,
tags=['maps']
) as dag:
t1 = SparkKubernetesOperator(
task_id='sample-spark',
application_file="spark/sample-spark/sample-spark.yaml",
namespace="incentives",
do_xcom_push=True,
dag=dag
)
t2 = SparkKubernetesSensor(
task_id='spark-sensor',
namespace="incentives",
application_name="{{ task_instance.xcom_pull(task_ids='sample-spark')['metadata']['name'] }}",
attach_log=True,
poke_interval=5,
dag=dag
)
t1 >> t2