有这样的达格:
import os
from datetime import timedelta
from xxx import on_failure_opsgenie
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.utils.dates import days_ago
DAG_ID = os.path.basename(__file__).replace(".py", "")
DEFAULT_ARGS = {
"owner": "airflow",
"depends_on_past": False,
"email": ["[email protected]"],
"email_on_failure": False,
"email_on_retry": False,
}
def kaboom(*args, **kwargs):
print("goodbye cruel world")
print(args)
print(kwargs)
assert 1 == 2
with DAG(
dag_id=DAG_ID,
default_args=DEFAULT_ARGS,
description="Print contents of airflow.cfg to logs",
dagrun_timeout=timedelta(hours=2),
start_date=days_ago(1),
schedule_interval=None,
on_failure_callback=on_failure_opsgenie,
) as dag:
get_airflow_cfg_operator = PythonOperator(task_id="gonna_explode", python_callable=kaboom)
DAG 按预期故意失败。然而,
on_failure_opsgenie
并没有做它应该做的事;如何在 AWS MWAA 中获取日志或调试失败的失败回调?
第一步是查看
on_failure_opsgenie
的定义。如果内存可用,回调中的异常仍应打印到任务日志中。但是,如果他们不这样做,您可以创建一个 PythonOperator
并传递 python_callable=on_failure_opsgenie
。每次任务运行时,它都会尝试执行您的回调,任何异常都应该最终记录在日志中。然后你就可以从那里开始。
我发现让这些回调尽可能简单也很有帮助,这样当它们失败时,问题出在哪里或多或少是显而易见的。
on_*_callback
日志位于调度程序级别日志,您可以在 Airflow 实例日志文件夹中搜索它们。如果您在本地执行 Airflow,您可能会在 Docker 实例上运行它,这样您就可以在类似 /opt/airflow/logs/scheduler/latest/<dag_folder>
的内容中查找日志文件夹。
例如做这样的事情:
docker exec -it <instance_name> cat opt/airflow/logs/scheduler/latest/<dag_folder>
并在控制台上查找
logging_mixin.py
日志。在那里您可以找到有关可能损坏的内容的更多信息