Airflow Oracle Operator

问题描述 投票:1回答:1

我是个新手 Airflow 我正在尝试执行我的第一个 DAG 使用 Oracle 运营商。但是,我得到的错误是

" Invalid Syntax "以及这个 "airflow.exceptions : dag_id could not be found.Either dag did not exist or failed parse"。不是dag不存在就是解析失败"

我已经把我的 dag 在与example_dags相同的位置。(/usr/lib/python2.7/site-packages/airflow/example_dags/my_dag.py) 但不知为何,它没有在用户界面中显示出来,所以我试着用下面的命令来执行。

airflow run example_sql_dag task_sql 2020-1-17

以下是我的oracle dag的代码。

from datetime import datetime,timedelta

import airflow
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.oracle_operator import OracleOperator

default_args = {
'owner': 'Airflow',
'depends_on_past': False,
'start_date': airflow.utils.dates.days_ago(2),
'start_date': datetime(2020,01,17),   
'email': ['[email protected]'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 0,
'retry_delay': timedelta(minutes=5)

}


with DAG('example_sql_dag',
     default_args=default_args,
     catchup=False,
     schedule_interval='*/10 * * * *'
     ) as dag:
opr_sql = OracleOperator(task_id='task_sql',
                                       oracle_conn_id='Oracle_schema'
                                       sql= 'insert into table1 (a,b,c)values (1,2,3)',
                                               autocommit ='True')

谢谢你!

编辑:我试着直接执行python文件,到文件存放的位置,然后这样做:.my_dag.py。

但还是没戏,这次我得到了Command not found的错误,可能是脚本出了问题,请大家指教

oracle airflow operator-keyword
1个回答
1
投票

你的DAG定义中的'oracle_conn_id'和'sql'参数之间缺少一个","。

opr_sql = OracleOperator(task_id='task_sql',
                                       oracle_conn_id='Oracle_schema',
                                       sql= 'insert into table1 (a,b,c)values (1,2,3)',
                                               autocommit ='True')
© www.soinside.com 2019 - 2024. All rights reserved.