动态生成气流DAG

问题描述 投票:0回答:2

[我正在尝试使用python代码中的模板并使用定义为here的globals()来生成气流损失”>

定义dag对象并保存。下面是我的代码:

import datetime as dt
import sys 

import airflow
from airflow.models import DAG
from airflow.operators.bash_operator import BashOperator

argumentList = sys.argv
owner = argumentList[1]
dag_name = argumentList[2]
taskID = argumentList[3]
bashCommand = argumentList[4]

default_args = {
    'owner': owner,
    'start_date': dt.datetime(2019, 6, 1),
    'retries': 1,
    'retry_delay': dt.timedelta(minutes=5),
}

def dagCreate():
    with DAG(dag_name,
             default_args=default_args,
             schedule_interval=None,
             ) as dag:

        print_hello = BashOperator(task_id=taskID, bash_command=bashCommand)
    return dag

globals()[dag_name] = dagCreate()

我已将此Python代码保留在dag_folder之外,并按如下所示执行它:

python bash-dag-generator.py Airflow test_bash_generate auto_bash_task ls

但是我在气流Web服务器用户界面中看不到任何DAG。我不确定我要去哪里哪里。

我正在尝试使用python代码中的模板并使用此处定义的globals()来生成气流dag,以定义dag对象并保存。下面是我的代码:以dt import sys的形式导入datetime ...

airflow
2个回答
0
投票

0
投票

我能够实现动态DAG的方法是使用气流变量。在下面的示例中,我有一个csv文件,其中包含Bash命令(如ls,echo等)的列表。作为read_file任务的一部分,我正在将文件位置更新为Airflow Variable。我们读取csv文件并循环执行命令的部分是创建动态DAG的位置。

© www.soinside.com 2019 - 2024. All rights reserved.