我有两个 DAG,DAG A 和 DAG B。
DAG A 每 15 分钟运行一次,DAG B 每小时运行一次。
它们在任何情况下都不能同时运行。
如果DAG A当前正在运行,DAG B启动,DAG B需要等待DAG A结束。反之亦然——如果 DAG B 正在运行,DAG A 需要等待。
我曾尝试向两个 DAG 添加带有自定义函数的 ExternalTaskSensor 以实现此行为,但最终会导致死锁。两个 DAG 最终都在等待对方。
# In DAG A:
sensor = ExternalTaskSensor(task_id='dag_sensor',
external_dag_id='DAG_B',
external_task_id=None,
dag=dag,
check_existence=True,
execution_date_fn=get_execution_date,
allowed_states=["success", "failed"],
mode='reschedule')
def get_execution_date():
session = settings.Session()
dr = session.query(DagRun)\
.filter(DagRun.dag_id == 'DAG_B')\
.order_by(DagRun.execution_date.desc())\
.first()
return dr.execution_date
# DAG B has the same code but finds DAG A's execution date
如果一个 DAG 正在等待对方,我如何告诉他们基本上放弃或屈服于另一个 DAG?
一个选项是设置一个只包含一个插槽的Pool,因此在任何时间点都只允许一个任务使用该插槽。
请注意,此并发限制是在 task 级别,而不是 DAG 级别,但如果您同意任务执行可能在两个 DAG 之间交错,那么在任务级别限制并发实际上是相同的。您可以像这样为 DAG 中的所有任务设置
pool
:
@dag(
default_args={
'pool': 'my_pool',
}
)
def dag_a():
...