如何防止气流回填dag运行?

问题描述 投票:36回答:3

假设你有一个对回填没有意义的气流DAG,这意味着,在它运行一次后,快速运行它将是完全没有意义的。

例如,如果您从一些仅每小时更新一次的数据库加载数据到数据库中,那么快速连续发生的回填只会一次又一次地导入相同的数据。

当您实例化一个新的每小时任务时,这尤其令人讨厌,并且它在它错过的每个小时运行N次数,执行冗余工作,然后在您指定的时间间隔内开始运行。

我能想到的唯一解决方案是他们在FAQ of the docs特别建议的

我们建议不要使用动态值作为start_date,尤其是datetime.now(),因为它可能非常混乱。

有没有办法禁用DAG的回填,或者我应该怎么做?

python scheduled-tasks airflow
3个回答
38
投票

升级到airflow版本1.8并在airflow.cfg中使用catchup_by_default = False或对每个dag应用catchup = False。

https://github.com/apache/incubator-airflow/blob/master/UPDATING.md#catchup_by_default


16
投票

这似乎是一个未解决的Airflow问题。我知道我真的希望拥有完全相同的功能。就我而言,这就是我的意思;它可能对其他人有用。

UI功能(至少在1.7.1.3中)可以帮助解决这个问题。如果您转到树视图并单击特定任务(方框),则会出现一个对话框按钮,其中包含“标记成功”按钮。单击“过去”,然后单击“标记成功”将在DAG中将该任务的所有实例标记为成功,并且不会运行它们。顶级DAG(顶部的圆圈)也可以以类似的方式标记为成功,但似乎没有标记多个DAG实例的方法。

我还没有深入研究它,但可以使用'trigger_dag'子命令来标记DAG的状态。看到这里:https://github.com/apache/incubator-airflow/pull/644/commits/4d30d4d79f1a18b071b585500474248e5f46d67d

用于标记DAG的CLI功能正在开发中:http://mail-archives.apache.org/mod_mbox/airflow-commits/201606.mbox/%[email protected]%3E https://github.com/apache/incubator-airflow/pull/1590

更新(2016年9月28日):添加了一个新的运营商'LatestOnlyOperator'(https://github.com/apache/incubator-airflow/pull/1752),它只运行最新版本的下游任务。听起来非常有用,希望它能很快进入发布

更新2:从气流1.8开始,LatestOnlyOperator已被释放。


4
投票

在dag声明中设置catchup = False将提供这个确切的功能。

我没有评论的“声望”,但我想说,为了这个目的,我设计了catchup = False(由我设计)。另外,我可以在1.10.1中验证它在实例化中明确设置时是否正常工作。但是当我置于默认args中时,我看不到它的工作原理。我已经离开Airflow 18个月了,所以在我看看为什么默认的args不适用于追赶之前会有一点点。

dag = DAG('example_dag',
        max_active_runs=3,
        catchup=False,
        schedule_interval=timedelta(minutes=5),
        default_args=default_args)
© www.soinside.com 2019 - 2024. All rights reserved.