如何指示气流从最近到最旧的回填

问题描述 投票:2回答:4

我有一个Airflow DAG计划每天运行。当我在上个月开始回填时,Airflow将开始处理从最旧到最新的运行。由于单次运行需要几个小时,这意味着当新的运行可用时(通过回填处理一天),新运行将仅在整个回填完成后处理(导致最近的数据为不适用于公司)。是否可以指示Airflow处理从最近到最旧的运行?

airflow airflow-scheduler
4个回答
1
投票

我不认为使用Airflow标准组件是可行的。

根据任务量,您可以将所有任务设置为成功状态。运行完成后,只需清除状态并导入日期。


0
投票

Airflow将确定计划创建的DAG运行的日期,该日期是针对该dag的最新dag运行。

一个解决方案,虽然是一个混乱的解决方案,是为今天手动创建DAG运行(确保您完全匹配Dag Id,并使用一致的Run Id格式,如调度程序使用)。这将迫使Airflow跳过DAG运行,这应该发生在新的DAG运行执行日期之前。

然后,您可以复制DAG本身,重命名它,并设置开始和结束日期。开始日期应该是回填应该开始的时间,结束日期应该是您为手动DAG运行设置的执行日期之前的日期/时间。 (在它之前的第二个罚款)

这将使您的主DAG保持最新,同时回填数据。但是,这样做会使您的DAG历史记录分为两个位置。如果你真的在乎你可能会写一些SQL来合并它。它可能不适用于每个用例,具体取决于您的DAG的设置方式,但可能是您的解决方案。


0
投票

对您的问题的简短回答是否定的,这不是今天支持的Airflow功能。在DAG主要积压后,我们中的一些人在类似情况下对此功能有类似的愿望,因此可能值得在Airflow Jira上添加票证或在Airflow mailing list上开始一个线程以收集更多输入。 (毕竟,也许这是一个很常见的场景,我们应该考虑正式支持它。)

同时你可以做的一件事就是让所有的回填都被创建,根据你拥有的数量将每个回填标记为手动/编程失败。然后,重新运行失败的DAG从最新的第一次运行而不是正常的最早的第一次运行。这不像内置功能那么容易,但我在类似情况下使用它作为解决方法。

触发“自动失败的DAG运行”的一个方法是添加一行,将一个异常作为DAG中第一个任务的第一行,然后在创建所有回填DAG运行后删除该行。


0
投票

有一个被标记为已解决的feature request

从故障单详细信息看,Airflow 1.10.3可以提供此信息。在撰写本文时,它有yet to be released,但可能很快就会出现。

票证评论中显示了该用法:

通过在调度程序部分下设置backfill_dagrun_order_reverse = True来反向创建回填dagrun


0
投票

您可以在Airflow 1.10.3中执行此操作

https://airflow.apache.org/cli.html#backfill

airflow backfill --run_backwards dag_id
© www.soinside.com 2019 - 2024. All rights reserved.