我正在尝试在 Airflow 中运行 DAG,它将在我的环境中执行 Python 脚本。我测试了与文件位于同一目录中的命令,逻辑似乎是正确的,但在 Airflow 中,它根本不执行脚本。我在第一个任务中添加了打印语句,响应是:
[2023-11-17, 07:00:07 UTC] {logging_mixin.py:137} INFO - The script /opt/***/src/etl/extract.py either does not exist or cannot be executed
[2023-11-17, 07:00:07 UTC] {python.py:177} INFO - Done. Returned value was: None
[2023-11-17, 07:00:07 UTC] {taskinstance.py:1323} INFO - Marking task as SUCCESS. dag_id=run_scripts_daily, task_id=run_extract, execution_date=20231117T070006, start_date=20231117T070007, end_date=20231117T070007
[2023-11-17, 07:00:07 UTC] {local_task_job.py:208} INFO - Task exited with return code 0
[2023-11-17, 07:00:07 UTC] {taskinstance.py:2578} INFO - 1 downstream tasks scheduled from follow-on schedule check
我已经尝试解决这个问题几个小时了。
subprocess.run
似乎是解决方案,但我无法正确配置它们。如果有人可以提供帮助,我将不胜感激:)
我的爸爸:
from airflow.decorators import dag
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta
import subprocess
import os
default_args = {
"owner": "airflow",
"depends_on_past": False,
"email_on_failure": False,
"email_on_retry": False,
"retries": 1,
"retry_delay": timedelta(minutes=5),
}
def run_extract():
airflow_home = '/opt/airflow/'
script_path = os.path.join(airflow_home, 'src/etl/extract.py')
if os.path.exists(script_path) and os.access(script_path, os.X_OK):
subprocess.run(['python', script_path], shell=True)
else:
print(f'O script {script_path} não existe ou não pode ser executado')
def run_pre_validate():
#subprocess.run(['python', '../src/validators/pre_validate.py'])
pass
def run_transform():
# subprocess.run(['python', '../src/etl/transform.py'], shell=True)
pass
def run_pos_validate():
# subprocess.run(['python', '../src/validators/pos_validate.py'])
pass
def run_load():
# subprocess.run(['python', '/src/etl/load.py'])
pass
@dag(
"run_scripts_daily",
start_date=datetime(2021, 12, 1),
max_active_runs=1,
schedule="@daily",
default_args=default_args,
catchup=False,
)
def run_scripts_daily():
opr_run_extract = PythonOperator(
task_id="run_extract",
python_callable=run_extract
)
opr_run_pre_validate = PythonOperator(
task_id="run_pre_validate",
python_callable=run_pre_validate
)
opr_run_transform = PythonOperator(
task_id="run_transform",
python_callable=run_transform
)
opr_run_pos_validate = PythonOperator(
task_id="run_pos_validate",
python_callable=run_pos_validate
)
opr_run_load = PythonOperator(
task_id="run_load",
python_callable=run_load
)
opr_run_extract >> opr_run_pre_validate >> opr_run_transform >> opr_run_pos_validate >> opr_run_load
run_scripts_daily_dag = run_scripts_daily()
我不知道这是否是最好的解决方案,但我尝试将 .py 脚本复制到 docker 容器并且它有效。