airflow 2.9 在每个月的第一个工作日安排 DAG

问题描述 投票:0回答:1

我想在每个月的第一个工作日安排一个 DAG。例如,今年它会被安排在:

2024-01-01
2024-02-01
2024-03-01
2024-04-01
2024-05-01
2024-06-03  -----> here the third because the first is a Saturday

我已经看过了https://airflow.apache.org/docs/apache-airflow/stable/authoring-and-scheduling/timetable.html,但是构建这样的DAG似乎相当复杂。这是进行这种安排的唯一方法吗?

这是我寻找第一个工作日的逻辑:

import pendulum
year = 2024

for month in range(0,13):
    first_day_month = pendulum.DateTime(year=year,month=month,day=1)
    working_day = first_day_month if first_date_month.weekday() >= 0 and first_day_month.weekday <= 4 else first_day_month.next(pendulum.MONDAY)
    print(working_day)
python airflow
1个回答
0
投票
from datetime import datetime

import pendulum

from airflow import DAG
from airflow.decorators import task
from airflow.operators.empty import EmptyOperator
from airflow.utils.log.logging_mixin import LoggingMixin


@task.short_circuit(task_id="check-date")
def check_date(data_interval_end: datetime):
    logger = LoggingMixin().log
    year = data_interval_end.year
    month = data_interval_end.month

    # logic to find the first working day
    first_day_month = pendulum.datetime(year, month, 1)
    working_day = (
        first_day_month
        if first_day_month.weekday() <= 4
        else first_day_month.next(pendulum.MONDAY)
    )

    logger.info(f"Checking if {data_interval_end.date()} equals {working_day.date()}")
    return data_interval_end.date() == working_day.date()


with DAG(
    dag_id="check-working-day",
    schedule_interval="0 0 1-7 * *",
    start_date=datetime(2024, 1, 1),
    catchup=True,
) as dag:
    start = EmptyOperator(task_id="start_task")
    continue_task = EmptyOperator(task_id="continue_task")

    start >> check_date() >> continue_task

可以使用它来代替使用 ShortCircuitOperator 进行调度。 该函数充当 ShortCircuitOperator,根据日期检查确定是否应执行后续任务。函数 check_date 确定当月的第一个工作日。如果该月的第一天是周末(周六或周日),则计算下周一。如果执行日期与第一个工作日不匹配,任务将短路(即阻止下游任务执行)。

© www.soinside.com 2019 - 2024. All rights reserved.