[我正在尝试使用python代码中的模板并使用定义为here的globals()来生成气流损失”>
定义dag对象并保存。下面是我的代码:
import datetime as dt import sys import airflow from airflow.models import DAG from airflow.operators.bash_operator import BashOperator argumentList = sys.argv owner = argumentList[1] dag_name = argumentList[2] taskID = argumentList[3] bashCommand = argumentList[4] default_args = { 'owner': owner, 'start_date': dt.datetime(2019, 6, 1), 'retries': 1, 'retry_delay': dt.timedelta(minutes=5), } def dagCreate(): with DAG(dag_name, default_args=default_args, schedule_interval=None, ) as dag: print_hello = BashOperator(task_id=taskID, bash_command=bashCommand) return dag globals()[dag_name] = dagCreate()
我已将此Python代码保留在dag_folder之外,并按如下所示执行它:
python bash-dag-generator.py Airflow test_bash_generate auto_bash_task ls
但是我在气流Web服务器用户界面中看不到任何DAG。我不确定我要去哪里哪里。
我正在尝试使用python代码中的模板并使用此处定义的globals()来生成气流dag,以定义dag对象并保存。下面是我的代码:以dt import sys的形式导入datetime ...
我能够实现动态DAG的方法是使用气流变量。在下面的示例中,我有一个csv文件,其中包含Bash命令(如ls,echo等)的列表。作为read_file任务的一部分,我正在将文件位置更新为Airflow Variable。我们读取csv文件并循环执行命令的部分是创建动态DAG的位置。