尝试在 Airflow 上运行 python 脚本

问题描述 投票:0回答:1

我正在尝试在 Airflow 中运行 DAG,它将在我的环境中执行 Python 脚本。我测试了与文件位于同一目录中的命令,逻辑似乎是正确的,但在 Airflow 中,它根本不执行脚本。我在第一个任务中添加了打印语句,响应是:

[2023-11-17, 07:00:07 UTC] {logging_mixin.py:137} INFO - The script /opt/***/src/etl/extract.py either does not exist or cannot be executed
[2023-11-17, 07:00:07 UTC] {python.py:177} INFO - Done. Returned value was: None
[2023-11-17, 07:00:07 UTC] {taskinstance.py:1323} INFO - Marking task as SUCCESS. dag_id=run_scripts_daily, task_id=run_extract, execution_date=20231117T070006, start_date=20231117T070007, end_date=20231117T070007
[2023-11-17, 07:00:07 UTC] {local_task_job.py:208} INFO - Task exited with return code 0
[2023-11-17, 07:00:07 UTC] {taskinstance.py:2578} INFO - 1 downstream tasks scheduled from follow-on schedule check

我已经尝试解决这个问题几个小时了。

subprocess.run
似乎是解决方案,但我无法正确配置它们。如果有人可以提供帮助,我将不胜感激:)

我的爸爸:

from airflow.decorators import dag
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta
import subprocess
import os

default_args = {
    "owner": "airflow",
    "depends_on_past": False,
    "email_on_failure": False,
    "email_on_retry": False,
    "retries": 1,
    "retry_delay": timedelta(minutes=5),
}

def run_extract():
    airflow_home = '/opt/airflow/'
    script_path = os.path.join(airflow_home, 'src/etl/extract.py')
    if os.path.exists(script_path) and os.access(script_path, os.X_OK):
        subprocess.run(['python', script_path], shell=True)
    else:
        print(f'O script {script_path} não existe ou não pode ser executado')

def run_pre_validate():
    #subprocess.run(['python', '../src/validators/pre_validate.py'])
    pass

def run_transform():
    # subprocess.run(['python', '../src/etl/transform.py'], shell=True)
    pass

def run_pos_validate():
    # subprocess.run(['python', '../src/validators/pos_validate.py'])
    pass

def run_load():
    # subprocess.run(['python', '/src/etl/load.py'])
    pass

@dag(
    "run_scripts_daily",
    start_date=datetime(2021, 12, 1),
    max_active_runs=1,
    schedule="@daily",
    default_args=default_args,
    catchup=False,
)
def run_scripts_daily():
    opr_run_extract = PythonOperator(
        task_id="run_extract",
        python_callable=run_extract
    )

    opr_run_pre_validate = PythonOperator(
        task_id="run_pre_validate",
        python_callable=run_pre_validate
    )

    opr_run_transform = PythonOperator(
        task_id="run_transform",
        python_callable=run_transform
    )

    opr_run_pos_validate = PythonOperator(
        task_id="run_pos_validate",
        python_callable=run_pos_validate
    )

    opr_run_load = PythonOperator(
        task_id="run_load",
        python_callable=run_load
    )

    opr_run_extract >> opr_run_pre_validate >> opr_run_transform >> opr_run_pos_validate >> opr_run_load

run_scripts_daily_dag = run_scripts_daily()
python airflow etl
1个回答
0
投票

我不知道这是否是最好的解决方案,但我尝试将 .py 脚本复制到 docker 容器并且它有效。

© www.soinside.com 2019 - 2024. All rights reserved.