替换 prev_execution_date,因为它在气流 2 中已弃用

问题描述 投票:0回答:1

我正在将气流 1 管道迁移到气流 2 上,并偶然发现了已弃用的

{{ prev_execution_date }}

我不知道可以用什么来代替。
我找到了

prev_data_interval_start_success
prev_data_interval_end_success
,但不确定从这些中选择哪一个,或者它们是否是
prev_execution_date
的正确替代品?

还有其他模板变量,例如

execution_date
,其替换为
logical_date

文档链接:https://airflow.apache.org/docs/apache-airflow/stable/templates-ref.html

python airflow jinja2 airflow-2.x
1个回答
0
投票

只需使用

TaskInstance
DagRun
型号。这是一个例子:

import logging
from datetime import datetime

from airflow import DAG
from airflow.models import TaskInstance, DagRun
from airflow.operators.python import PythonOperator

dag = DAG(
    dag_id='test_dag',
    start_date=datetime(2024, 1, 1),
    schedule_interval='@daily',
    max_active_runs=1,
)


def first(ti: TaskInstance, **kwargs: dict):
    if ti.previous_ti:
        logging.info('ti.previous_ti.execution_date = %s', ti.previous_ti.execution_date)
    else:
        logging.info('previous_ti not found')

    dag_run = DagRun.get_previous_dagrun(dag_run=ti.dag_run)
    if dag_run:
        logging.info('dag_run = %s', dag_run.execution_date)
    else:
        logging.info('dag_run not found')


PythonOperator(dag=dag, task_id='first', python_callable=first)

让我们检查日志:

# first dag_run
...
[2024-02-22, 09:44:39 UTC] {test_dag.py:20} INFO - previous_ti not found
[2024-02-22, 09:44:39 UTC] {test_dag.py:26} INFO - dag_run not found
...
# second dag_run
[2024-02-22, 09:44:41 UTC] {test_dag.py:18} INFO - ti.previous_ti.execution_date = 2024-01-01 00:00:00+00:00
[2024-02-22, 09:44:41 UTC] {test_dag.py:24} INFO - dag_run = 2024-01-01 00:00:00+00:00
...
© www.soinside.com 2019 - 2024. All rights reserved.