创建一个将运行多天的每日 DAG

问题描述 投票:0回答:1

我想创建一个每日 DAG,根据 DAG 执行开始的日期读取文件。该文件包含当天要处理的文件夹列表。文件夹的数量每天都会变化。如果有很多文件夹,DAG 可能需要几天时间才能完成。

对于 DAG 执行开始当天文件中的每个文件夹,我们必须执行一些不同的 DAG 任务(PythonOperator、BashOperator 等)。即使当前日期发生变化并且 DAG 花费的时间超过 24 小时,它也应该继续处理上面获得的文件夹列表。

问题是,如果 DAG 中有全局 python 变量,那么每次解析 DAG 时它们都会更新。所以如果我使用这样的变量:

    DATE = datetime.now(tz).date()

当日期改变时,DATE变量的值也会改变。

因此,我尝试将日期变量保存在一个名为 DAG 的 run_id 的文件中,以便文件名始终是唯一的,并且 DAG 的 run_id 对于每次 DAG 运行都是唯一的,并且在 DAG 运行时始终保持不变。

但是,问题是获取值并将其传递给不同的任务。为此,我能够创建一个 user_define_macro 并将从文件中读取的日期传递给所有任务。

在此之后,下一个问题是我们如何列出每个日期工作要执行的任务列表。我尝试使用 for 循环,但显然它不起作用,因为 python 代码是在 DAG 解析时解析的,而不是在 DAG 执行时解析的。这是我不知道如何解决的最后一个问题。基本上循环遍历特定日期的每个文件夹。

google-cloud-platform airflow directed-acyclic-graphs airflow-2.x
1个回答
0
投票

我非常确定您已经重新发明了 Apache Airflow 和 Data Intervals 的默认行为。我想与确定性日期相关的所有内容都可以通过访问

logical_date
data_interval_start
data_interval_end
来实现。您可以选择其中一种方法

from __future__ import annotations

from datetime import datetime, timezone
from typing import TYPE_CHECKING

from airflow import DAG
from airflow.decorators import task
from airflow.models import BaseOperator
from airflow.operators.python import PythonOperator, get_current_context

if TYPE_CHECKING:
    from airflow.models.dagrun import DagRun
    from airflow.utils.context import Context


class CustomOperator(BaseOperator):
    def execute(self, context: Context):
        dag_run: DagRun = context["dag_run"]
        print(f"Logical Date: {dag_run.logical_date}")
        print(f"Data Interval Start: {dag_run.data_interval_start}")
        print(f"Data Interval End: {dag_run.data_interval_end}")


def via_callable():
    context = get_current_context()
    dag_run: DagRun = context["dag_run"]
    print(f"Logical Date: {dag_run.logical_date}")
    print(f"Data Interval Start: {dag_run.data_interval_start}")
    print(f"Data Interval End: {dag_run.data_interval_end}")


with DAG(
    "so_dag",
    schedule="0 10 * * *",
    start_date=datetime(2024, 3, 1, tzinfo=timezone.utc),
    tags=["task-context"],
    catchup=False,
) as dag:

    @task
    def task_flow_example_via_kwargs(**kwargs):
        dag_run: DagRun = kwargs["dag_run"]
        print(f"Logical Date: {dag_run.logical_date}")
        print(f"Data Interval Start: {dag_run.data_interval_start}")
        print(f"Data Interval End: {dag_run.data_interval_end}")

    @task
    def task_flow_example_via_runtime_magic(*, dag_run: DagRun):
        print(f"Logical Date: {dag_run.logical_date}")
        print(f"Data Interval Start: {dag_run.data_interval_start}")
        print(f"Data Interval End: {dag_run.data_interval_end}")

    task_flow_example_via_kwargs()
    task_flow_example_via_runtime_magic()
    CustomOperator(task_id="via_custom_operator")
    PythonOperator(task_id="via_callable", python_callable=via_callable)

请注意,不仅限于上面列出的,还可以访问模板

© www.soinside.com 2019 - 2024. All rights reserved.