我正在将气流 1 管道迁移到气流 2 上,并偶然发现了已弃用的
{{ prev_execution_date }}
。
我不知道可以用什么来代替。
我找到了
prev_data_interval_start_success
和 prev_data_interval_end_success
,但不确定从这些中选择哪一个,或者它们是否是 prev_execution_date
的正确替代品?
还有其他模板变量,例如
execution_date
,其替换为 logical_date
。
文档链接:https://airflow.apache.org/docs/apache-airflow/stable/templates-ref.html
只需使用
TaskInstance
或 DagRun
型号。这是一个例子:
import logging
from datetime import datetime
from airflow import DAG
from airflow.models import TaskInstance, DagRun
from airflow.operators.python import PythonOperator
dag = DAG(
dag_id='test_dag',
start_date=datetime(2024, 1, 1),
schedule_interval='@daily',
max_active_runs=1,
)
def first(ti: TaskInstance, **kwargs: dict):
if ti.previous_ti:
logging.info('ti.previous_ti.execution_date = %s', ti.previous_ti.execution_date)
else:
logging.info('previous_ti not found')
dag_run = DagRun.get_previous_dagrun(dag_run=ti.dag_run)
if dag_run:
logging.info('dag_run = %s', dag_run.execution_date)
else:
logging.info('dag_run not found')
PythonOperator(dag=dag, task_id='first', python_callable=first)
让我们检查日志:
# first dag_run
...
[2024-02-22, 09:44:39 UTC] {test_dag.py:20} INFO - previous_ti not found
[2024-02-22, 09:44:39 UTC] {test_dag.py:26} INFO - dag_run not found
...
# second dag_run
[2024-02-22, 09:44:41 UTC] {test_dag.py:18} INFO - ti.previous_ti.execution_date = 2024-01-01 00:00:00+00:00
[2024-02-22, 09:44:41 UTC] {test_dag.py:24} INFO - dag_run = 2024-01-01 00:00:00+00:00
...