如何在循环中实现气流DAG

问题描述 投票:0回答:2

我刚刚开始使用 Airflow。我想在循环中设置一个 DAG,当前一个 DAG 完成时,下一个 DAG 开始。这是我想要实现的工作流程:

list_of_files = [......]
for file in list_of_files:
   dag = DAG('pipeline', default_args=default_args, schedule_interval=None)
   t1 = BashOperator('copy_this_file', ....)
   t2 = BashOperator('process_this_file', ...)
   t1.set_downstream(t2)

如果我运行

airflow backfill pipeline -s 2019-05-01
,所有 DAG 都会同时启动。

python airflow
2个回答
3
投票

DAG 不能相互依赖,它们是独立的工作流程。您希望将任务配置为相互依赖。您可以拥有一个具有多个执行分支的 DAG,每个文件一个执行分支,如下所示(未测试):

dag = DAG('pipeline', ...)
list_of_files = [......]
with dag:
    for file in list_of_files:
       t1 = BashOperator('copy_this_file', ....)
       t2 = BashOperator('process_this_file', ...)
       t1.set_downstream(t2)

0
投票

即使 DAG 是单独的工作流程,您也可以使用 TriggerDagRunOperator。您只需要下一个 DAG 的名称 dag id,因此您需要编辑循环以向前查看或仅使用索引或其他内容。轻松修复。下面的运算符示例:

from airflow.operators.trigger_dagrun import TriggerDagRunOperator

list_of_files = [......]
for file in list_of_files:
   dag = DAG('pipeline', default_args=default_args, schedule_interval=None)
   t1 = BashOperator('copy_this_file', ....)
   t2 = BashOperator('process_this_file', ...)
   trigger_run_task = TriggerDagRunOperator(
       task_id="next_dag_trigger_run",
       trigger_dag_id="next_dag_id",
   )
   t1 >> t2 >> trigger_run_task
© www.soinside.com 2019 - 2024. All rights reserved.