如何防止 Airflow DAG 同时运行?

问题描述 投票:0回答:1

这是我的用例:

我有两个 DAG,DAG A 和 DAG B。

DAG A 每 15 分钟运行一次,DAG B 每小时运行一次。

它们在任何情况下都不能同时运行。

如果DAG A当前正在运行,DAG B启动,DAG B需要等待DAG A结束。反之亦然——如果 DAG B 正在运行,DAG A 需要等待。

我曾尝试向两个 DAG 添加带有自定义函数的 ExternalTaskSensor 以实现此行为,但最终会导致死锁。两个 DAG 最终都在等待对方。

# In DAG A:

sensor = ExternalTaskSensor(task_id='dag_sensor',
                            external_dag_id='DAG_B',
                            external_task_id=None,
                            dag=dag,
                            check_existence=True,
                            execution_date_fn=get_execution_date,
                            allowed_states=["success", "failed"],
                            mode='reschedule')

def get_execution_date():
    session = settings.Session()
    dr = session.query(DagRun)\
        .filter(DagRun.dag_id == 'DAG_B')\
        .order_by(DagRun.execution_date.desc())\
        .first()
    return dr.execution_date

# DAG B has the same code but finds DAG A's execution date

如果一个 DAG 正在等待对方,我如何告诉他们基本上放弃或屈服于另一个 DAG?

airflow deadlock
1个回答
0
投票

一个选项是设置一个只包含一个插槽的Pool,因此在任何时间点都只允许一个任务使用该插槽。

请注意,此并发限制是在 task 级别,而不是 DAG 级别,但如果您同意任务执行可能在两个 DAG 之间交错,那么在任务级别限制并发实际上是相同的。您可以像这样为 DAG 中的所有任务设置

pool

@dag(
    default_args={
        'pool': 'my_pool',
    }
)
def dag_a():
    ...
© www.soinside.com 2019 - 2024. All rights reserved.