为什么我的 DAG 无法工作并标记成功?

问题描述 投票:0回答:1

我写了一个 DAG,但是当我触发它时,没有任何反应,但在 UI 中我可以看到它成功结束。这是代码

from datetime import datetime
from sqlalchemy import create_engine, text
import airflow
import requests
import requests.exceptions as requests_exceptions
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator




with DAG(
    dag_id='test',
    start_date=datetime.today(),
    schedule_interval=None
) as dag:



    def _select():
        engine = create_engine("postgresql://postgres:123456@localhost:5433/main_base")
        try:
            with engine.connect() as conn:
                statement = conn.execute(text(
                    '''
                    INSERT INTO public.main("id", "name")
                    VALUES (5, "'med'");
                    '''
                ))
                conn.commit()
                return True
        except:
            return False


    insert_data = PythonOperator(
        task_id='insert_data',
        python_callable=_select,
        dag=dag,
    )

    bash_operator = BashOperator(
        task_id='bash_operator',
        bash_command='echo vk.com',
        dag=dag,
    )



    insert_data >> bash_operator

我检查了postgres连接、表等,一切正常,当我在IDE中使用这个函数时,它工作了

python sqlalchemy airflow
1个回答
0
投票

Bash 操作符什么也不做,它只是一种调试器,我在没有他的情况下尝试过,没关系

© www.soinside.com 2019 - 2024. All rights reserved.