我正在设计一个 DAG,它处理多个数据源并输出报告。 DAG 作为一个整体,每天都会触发。示意图:
src1 --┐
src2 --┤
... --┼---> report
srcN --┘
默认情况下,任务会等待所有上游任务成功完成。就我而言,源任务在不同时间触发并且具有不一致的运行时间。然而,我希望在 Airflow 之外获得一份反映系统最新状态的报告。因此,我想多次触发
report
任务 - 最初是在 DAG 初始化时(以创建占位符报告),稍后每当每个 src
任务完成时。
出于该问题的目的,假设所有
report
任务所做的就是提取所有上游任务的状态并将它们以 src1: {N/A, success, failure}
条目的形式存储在文件中。
为了在 Airflow 之外实现此目的,我可能会在源任务和
report
之间设置一个发布-订阅通道,并让每个任务在完成时发出通知(无论结果如何),以及订阅的报告服务对于这些通知,将在收到通知时运行。
我想到的一个解决方案是在单独的 DAG 中创建一个定期运行的任务,该任务分别查找每个
src
任务生成的一些输出文件,当文件丢失时分配 N/A。因此,实现了类似于自动更新报告的用户体验。然后,report
任务实际上与该 DAG 断开连接,并成为未连接的任务{组}的集合。
几个问题:
report
任务来实现我想要的?我考虑了report
的各种触发规则,但这些似乎只会触发
report
一次。report
使用任何机制运行,并且 report
将忽略报告中的虚拟任务。有没有更直接的方法来实现这一目标?