配置一个任务以在对上游任务的每次更改时重复

问题描述 投票:0回答:1

我正在设计一个 DAG,它处理多个数据源并输出报告。 DAG 作为一个整体,每天都会触发。示意图:

src1 --┐
src2 --┤
...  --┼---> report
srcN --┘

默认情况下,任务会等待所有上游任务成功完成。就我而言,源任务在不同时间触发并且具有不一致的运行时间。然而,我希望在 Airflow 之外获得一份反映系统最新状态的报告。因此,我想多次触发

report
任务 - 最初是在 DAG 初始化时(以创建占位符报告),稍后每当每个
src
任务完成时。

出于该问题的目的,假设所有

report
任务所做的就是提取所有上游任务的状态并将它们以
src1: {N/A, success, failure}
条目的形式存储在文件中。

为了在 Airflow 之外实现此目的,我可能会在源任务和

report
之间设置一个发布-订阅通道,并让每个任务在完成时发出通知(无论结果如何),以及订阅的报告服务对于这些通知,将在收到通知时运行。

我想到的一个解决方案是在单独的 DAG 中创建一个定期运行的任务,该任务分别查找每个

src
任务生成的一些输出文件,当文件丢失时分配 N/A。因此,实现了类似于自动更新报告的用户体验。然后,
report
任务实际上与该 DAG 断开连接,并成为未连接的任务{组}的集合。

几个问题:

  1. 我想要的工作流程适合 Airflow 吗?
  2. 假设这是可行的,我应该如何配置 DAG 本身和/或
    report
    任务来实现我想要的?我考虑了report的各种
    触发规则
    ,但这些似乎只会触发
    report
    一次。
  3. 为了创建占位符报告,我想创建一个虚拟任务,该任务立即完成并导致
    report
    使用任何机制运行,并且
    report
    将忽略报告中的虚拟任务。有没有更直接的方法来实现这一目标?
dependencies airflow task repeat directed-acyclic-graphs
1个回答
0
投票

目前还没有一种简单的方法可以做到这一点,但有一个 开放功能请求。如果你想从事这方面的工作那就太好了!

就像 @vdolez 提到的那样,您可以将

report
分离到自己的 DAG 中,然后在所有
srcN
任务下游拥有一个 TriggerDagRunOperator,其触发规则
none_failed

© www.soinside.com 2019 - 2024. All rights reserved.