我是个新手 Airflow
我正在尝试执行我的第一个 DAG
使用 Oracle
运营商。但是,我得到的错误是
" Invalid Syntax "以及这个 "airflow.exceptions : dag_id could not be found.Either dag did not exist or failed parse"。不是dag不存在就是解析失败"
我已经把我的 dag
在与example_dags相同的位置。(/usr/lib/python2.7/site-packages/airflow/example_dags/my_dag.py)
但不知为何,它没有在用户界面中显示出来,所以我试着用下面的命令来执行。
airflow run example_sql_dag task_sql 2020-1-17
以下是我的oracle dag的代码。
from datetime import datetime,timedelta
import airflow
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.oracle_operator import OracleOperator
default_args = {
'owner': 'Airflow',
'depends_on_past': False,
'start_date': airflow.utils.dates.days_ago(2),
'start_date': datetime(2020,01,17),
'email': ['[email protected]'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 0,
'retry_delay': timedelta(minutes=5)
}
with DAG('example_sql_dag',
default_args=default_args,
catchup=False,
schedule_interval='*/10 * * * *'
) as dag:
opr_sql = OracleOperator(task_id='task_sql',
oracle_conn_id='Oracle_schema'
sql= 'insert into table1 (a,b,c)values (1,2,3)',
autocommit ='True')
谢谢你!
编辑:我试着直接执行python文件,到文件存放的位置,然后这样做:.my_dag.py。
但还是没戏,这次我得到了Command not found的错误,可能是脚本出了问题,请大家指教
你的DAG定义中的'oracle_conn_id'和'sql'参数之间缺少一个","。
opr_sql = OracleOperator(task_id='task_sql',
oracle_conn_id='Oracle_schema',
sql= 'insert into table1 (a,b,c)values (1,2,3)',
autocommit ='True')