我写了一个 DAG,但是当我触发它时,没有任何反应,但在 UI 中我可以看到它成功结束。这是代码
from datetime import datetime
from sqlalchemy import create_engine, text
import airflow
import requests
import requests.exceptions as requests_exceptions
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
with DAG(
dag_id='test',
start_date=datetime.today(),
schedule_interval=None
) as dag:
def _select():
engine = create_engine("postgresql://postgres:123456@localhost:5433/main_base")
try:
with engine.connect() as conn:
statement = conn.execute(text(
'''
INSERT INTO public.main("id", "name")
VALUES (5, "'med'");
'''
))
conn.commit()
return True
except:
return False
insert_data = PythonOperator(
task_id='insert_data',
python_callable=_select,
dag=dag,
)
bash_operator = BashOperator(
task_id='bash_operator',
bash_command='echo vk.com',
dag=dag,
)
insert_data >> bash_operator
我检查了postgres连接、表等,一切正常,当我在IDE中使用这个函数时,它工作了
Bash 操作符什么也不做,它只是一种调试器,我在没有他的情况下尝试过,没关系