我想在每个月的第一个工作日安排一个 DAG。例如,今年它会被安排在:
2024-01-01
2024-02-01
2024-03-01
2024-04-01
2024-05-01
2024-06-03 -----> here the third because the first is a Saturday
我已经看过了https://airflow.apache.org/docs/apache-airflow/stable/authoring-and-scheduling/timetable.html,但是构建这样的DAG似乎相当复杂。这是进行这种安排的唯一方法吗?
这是我寻找第一个工作日的逻辑:
import pendulum
year = 2024
for month in range(0,13):
first_day_month = pendulum.DateTime(year=year,month=month,day=1)
working_day = first_day_month if first_date_month.weekday() >= 0 and first_day_month.weekday <= 4 else first_day_month.next(pendulum.MONDAY)
print(working_day)
from datetime import datetime
import pendulum
from airflow import DAG
from airflow.decorators import task
from airflow.operators.empty import EmptyOperator
from airflow.utils.log.logging_mixin import LoggingMixin
@task.short_circuit(task_id="check-date")
def check_date(data_interval_end: datetime):
logger = LoggingMixin().log
year = data_interval_end.year
month = data_interval_end.month
# logic to find the first working day
first_day_month = pendulum.datetime(year, month, 1)
working_day = (
first_day_month
if first_day_month.weekday() <= 4
else first_day_month.next(pendulum.MONDAY)
)
logger.info(f"Checking if {data_interval_end.date()} equals {working_day.date()}")
return data_interval_end.date() == working_day.date()
with DAG(
dag_id="check-working-day",
schedule_interval="0 0 1-7 * *",
start_date=datetime(2024, 1, 1),
catchup=True,
) as dag:
start = EmptyOperator(task_id="start_task")
continue_task = EmptyOperator(task_id="continue_task")
start >> check_date() >> continue_task
可以使用它来代替使用 ShortCircuitOperator 进行调度。 该函数充当 ShortCircuitOperator,根据日期检查确定是否应执行后续任务。函数 check_date 确定当月的第一个工作日。如果该月的第一天是周末(周六或周日),则计算下周一。如果执行日期与第一个工作日不匹配,任务将短路(即阻止下游任务执行)。