Airflow DAG 有 1 个任务使用 BashOperator 来运行运行 sqlplus 查询的 bash 脚本。脚本失败,错误 45 初始化 SQL*Plus

问题描述 投票:0回答:0

当我从命令行运行 bash 脚本时没有错误,但是当我尝试从我的 Airflow DAG 中的任务运行它时,脚本失败并显示:

Error 45 initializing SQL*Plus Error 45 states: "from using a non-local file system that is incompatible with SQL Plus"

但是,它从命令行运行良好。

有没有一种方法可以从我的任务中运行脚本而不会产生错误,或者是否有更好的方法来编写任务?

DAG 文件 - User_Activity_Report.py

# DAG: User_Activity_Report
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.sensors.filesystem import FileSensor
from airflow.operators.subdag import SubDagOperator
from airflow.utils.task_group import TaskGroup
from airflow.utils.email import send_email
from datetime import datetime, timedelta
from random import randint

currtime = datetime.now().strftime("%m/%d/%Y %H:%M")
datestr  = datetime.now().strftime("%Y%m%d")
# Report Output file
outputFile = 'UAR_' + datestr + '.txt'
# Shell Script to execute
shellFolder = '/home/airflow/airflow/STL/scripts'
shellScript = 'user_activity_report.sh'
# Email address of notification recipient
email_addr = '[email protected]'

# Email 1 - Send if sqlplus shell script fails (returns non-zero)
def notify_email_script_failed(contextDict, **kwargs):
    """Send custom email alert"""
    title = "** Error Running User Activity Report **".format(**contextDict)
    body = f"""\
    USER ACTIVITY REPORT <br><br>
        There was an error generating the User Activity Report. <br><br>
        Please review the DAG Run information in the Airflow UI for details. <br>
        DAG ID: User_Activity_Report <br><br><br><br>
        System Date/Time - {currtime} <br><br>
        On-Demand Airflow <br>
    """.format(**contextDict)
    send_email(email_addr, title, body)

# Email 2 - Send if Report runs successfully (script returns 0)
def notify_email_sucess(contextDict, **kwargs):
    """Send custom email alert"""
    title = "User Activity Report".format(**contextDict)
    body = f"""\
    USER ACTIVITY REPORT <br><br>
        The daily User Activity Report is attached. <br><br><br><br>
        System Date/Time - {currtime} <br><br>
        On-Demand Airflow <br>
    """.format(**contextDict)
    send_email(email_addr, title, body)

default_args = {
        'owner' : 'OnDemand',
        'depends_on_past' : False,
        'email' : [email_addr],
        'email_on_failure' : True,
        'email_on_retry' : False,
        'retries' : 3,
        'retry_delay' : timedelta(minutes = 5),
}

# DAG Definition - Run every weekday (M - F) at 1 minute after midnight; Retries: 3; Retry Delay: 5 min; Send email on failure
dag=DAG(dag_id='User_Activity_Report', default_args=default_args, start_date=datetime(2022,12,15),
        schedule_interval="1 0 * * 1-5", catchup=False, tags=["OnDemand", "Daily", "User_Activity"])

# Task - Run bash script that calls sqlplus to execute the query and write the results to the filename passed as a parameter
cmd = 'sh {}/{} {} '.format(shellFolder, shellScript, outputFile)

Run_SqlPlus_Script = BashOperator(
        task_id = 'Run_SqlPlus_Script',
        bash_command = cmd,
        on_success_callback = notify_email_sucess,
        on_failure_callback = notify_email_script_failed,
        dag=dag,
)

# Set Workflow Task Dependencies
start_task = DummyOperator( task_id= "start" )
stop_task  = DummyOperator( task_id= "stop"  )

start_task >> Run_SqlPlus_Script >> stop_task

BASH 脚本 - user_activity_report.sh

#!/bin/bash
# Script: user_activity_report.sh
export ORACLE_HOME=/usr/lib/oracle/12.1/client64
export PATH=$PATH:$ORACLE_HOME/bin
export TNS_ADMIN=$ORACLE_HOME/network/admin
export LD_LIBRARY_PATH=$ORACLE_HOME/lib

sqlplus <user>/<password>@//xxx-xxxx.xxxxxxxxxxxx.ca-central-1.rds.amazonaws.com:1521/ORAC @user_activity_query.sql > "$1"

exit $?

SQL 查询 - user_activity_query.sql

SELECT
  u.user_id as "USER_ID",
  u.authority_id as "AUTHORITY_ID",
  u.first_names as "FIRST_NAME",
  u.surname as "LAST_NAME",
  s.function_code as "FUNCTION",
  s.client_app_id as "CLIENT_APP",
  to_char(s.datetime,'MM-DD-YYYY HH24:MI') as "LOGIN DATE"
FROM usrr u LEFT JOIN sm_ulog s
  ON (u.user_id = s.user_id
  AND TRUNC(s.datetime) >= TRUNC(SYSTIMESTAMP - INTERVAL '60' DAY)
  AND s.function_code = 'SUCCESSFUL_LOGIN')
WHERE
  u.user_status = 1
ORDER BY
  u.user_id, s.datetime, s.client_app_id;
bash airflow sqlplus
© www.soinside.com 2019 - 2024. All rights reserved.