我想创建一个每日 DAG,根据 DAG 执行开始的日期读取文件。该文件包含当天要处理的文件夹列表。文件夹的数量每天都会变化。如果有很多文件夹,DAG 可能需要几天时间才能完成。
对于 DAG 执行开始当天文件中的每个文件夹,我们必须执行一些不同的 DAG 任务(PythonOperator、BashOperator 等)。即使当前日期发生变化并且 DAG 花费的时间超过 24 小时,它也应该继续处理上面获得的文件夹列表。
问题是,如果 DAG 中有全局 python 变量,那么每次解析 DAG 时它们都会更新。所以如果我使用这样的变量:
DATE = datetime.now(tz).date()
当日期改变时,DATE变量的值也会改变。
因此,我尝试将日期变量保存在一个名为 DAG 的 run_id 的文件中,以便文件名始终是唯一的,并且 DAG 的 run_id 对于每次 DAG 运行都是唯一的,并且在 DAG 运行时始终保持不变。
但是,问题是获取值并将其传递给不同的任务。为此,我能够创建一个 user_define_macro 并将从文件中读取的日期传递给所有任务。
在此之后,下一个问题是我们如何列出每个日期工作要执行的任务列表。我尝试使用 for 循环,但显然它不起作用,因为 python 代码是在 DAG 解析时解析的,而不是在 DAG 执行时解析的。这是我不知道如何解决的最后一个问题。基本上循环遍历特定日期的每个文件夹。
我非常确定您已经重新发明了 Apache Airflow 和 Data Intervals 的默认行为。我想与确定性日期相关的所有内容都可以通过访问
logical_date
、data_interval_start
或 data_interval_end
来实现。您可以选择其中一种方法
from __future__ import annotations
from datetime import datetime, timezone
from typing import TYPE_CHECKING
from airflow import DAG
from airflow.decorators import task
from airflow.models import BaseOperator
from airflow.operators.python import PythonOperator, get_current_context
if TYPE_CHECKING:
from airflow.models.dagrun import DagRun
from airflow.utils.context import Context
class CustomOperator(BaseOperator):
def execute(self, context: Context):
dag_run: DagRun = context["dag_run"]
print(f"Logical Date: {dag_run.logical_date}")
print(f"Data Interval Start: {dag_run.data_interval_start}")
print(f"Data Interval End: {dag_run.data_interval_end}")
def via_callable():
context = get_current_context()
dag_run: DagRun = context["dag_run"]
print(f"Logical Date: {dag_run.logical_date}")
print(f"Data Interval Start: {dag_run.data_interval_start}")
print(f"Data Interval End: {dag_run.data_interval_end}")
with DAG(
"so_dag",
schedule="0 10 * * *",
start_date=datetime(2024, 3, 1, tzinfo=timezone.utc),
tags=["task-context"],
catchup=False,
) as dag:
@task
def task_flow_example_via_kwargs(**kwargs):
dag_run: DagRun = kwargs["dag_run"]
print(f"Logical Date: {dag_run.logical_date}")
print(f"Data Interval Start: {dag_run.data_interval_start}")
print(f"Data Interval End: {dag_run.data_interval_end}")
@task
def task_flow_example_via_runtime_magic(*, dag_run: DagRun):
print(f"Logical Date: {dag_run.logical_date}")
print(f"Data Interval Start: {dag_run.data_interval_start}")
print(f"Data Interval End: {dag_run.data_interval_end}")
task_flow_example_via_kwargs()
task_flow_example_via_runtime_magic()
CustomOperator(task_id="via_custom_operator")
PythonOperator(task_id="via_callable", python_callable=via_callable)
请注意,不仅限于上面列出的,还可以访问模板