我正在尝试在 Docker 上运行的 Airflow 上运行一个简单的 DAG。
我有两个 python 脚本,第一个脚本使用 API 调用获取数据,第二个脚本将数据推送到谷歌表格中。因此,我使用
t1
执行第一个 Python 脚本,并使用 t2
执行第二个。
这是我的代码:
from airflow import DAG
#from airflow.operators.python import PythonOperator, BashOperator
from datetime import datetime, timedelta
from airflow.operators.bash_operator import BashOperator
from airflow.utils.dates import days_ago
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'email': ['[email protected]'],
'email_on_failure': True,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=1)}
with DAG("ETL_script",
description= "DAG for Data retrieval",
default_args = default_args,
start_date= datetime(2022, 1, 1),
schedule_interval= "@weekly",
catchup = False
) as dag:
t1 = BashOperator(
task_id = 'extract_data',
bash_command = '/Users/user.name/Coding Projects/Python_Apps/Docker_Applications/Airflow_Docker_Apps/Python_Scripts/API_extract.py',
dag=dag,)
t2 = BashOperator(
task_id = "Insert_into_google_Sheets",
bash_command = 'python3 "/Users/user.name/Coding Projects/Python_Apps/Docker_Applications/Airflow_Docker_Apps/Python_Scripts/Google_Sheets_Connection.py"',
dag=dag,)
t1 >> t2 # Defining the task dependencies
当我在
zsh
上运行 Bash 命令时,我可以完美运行两个 python 脚本,但在 Airflow 上,我收到此错误:
*** Reading local file: /opt/airflow/logs/ETL_script/extract_data/2022-05-02T21:27:56.923195+00:00/1.log
[2022-05-02, 21:27:58 UTC] {taskinstance.py:1043} INFO - Dependencies all met for <TaskInstance: ETL_script.extract_data manual__2022-05-02T21:27:56.923195+00:00 [queued]>
[2022-05-02, 21:27:58 UTC] {taskinstance.py:1043} INFO - Dependencies all met for <TaskInstance: ETL_script.extract_data manual__2022-05-02T21:27:56.923195+00:00 [queued]>
[2022-05-02, 21:27:58 UTC] {taskinstance.py:1249} INFO -
--------------------------------------------------------------------------------
[2022-05-02, 21:27:58 UTC] {taskinstance.py:1250} INFO - Starting attempt 1 of 2
[2022-05-02, 21:27:58 UTC] {taskinstance.py:1251} INFO -
--------------------------------------------------------------------------------
[2022-05-02, 21:27:58 UTC] {taskinstance.py:1270} INFO - Executing <Task(BashOperator): extract_data> on 2022-05-02 21:27:56.923195+00:00
[2022-05-02, 21:27:58 UTC] {standard_task_runner.py:52} INFO - Started process 148 to run task
[2022-05-02, 21:27:58 UTC] {standard_task_runner.py:79} INFO - Running: ['***', 'tasks', 'run', 'ETL_script', 'extract_data', 'manual__2022-05-02T21:27:56.923195+00:00', '--job-id', '134', '--raw', '--subdir', 'DAGS_FOLDER/ETL_script.py', '--cfg-path', '/tmp/tmpeaprux6w', '--error-file', '/tmp/tmp1hhf5ge7']
[2022-05-02, 21:27:58 UTC] {standard_task_runner.py:80} INFO - Job 134: Subtask extract_data
[2022-05-02, 21:27:58 UTC] {logging_mixin.py:109} INFO - Running <TaskInstance: ETL_script.extract_data manual__2022-05-02T21:27:56.923195+00:00 [running]> on host 4c9d01f48299
[2022-05-02, 21:27:58 UTC] {taskinstance.py:1448} INFO - Exporting the following env vars:
AIRFLOW_CTX_DAG_EMAIL=***@***.com
AIRFLOW_CTX_DAG_OWNER=***
AIRFLOW_CTX_DAG_ID=ETL_script
AIRFLOW_CTX_TASK_ID=extract_data
AIRFLOW_CTX_EXECUTION_DATE=2022-05-02T21:27:56.923195+00:00
AIRFLOW_CTX_DAG_RUN_ID=manual__2022-05-02T21:27:56.923195+00:00
[2022-05-02, 21:27:58 UTC] {subprocess.py:62} INFO - Tmp dir root location:
/tmp
[2022-05-02, 21:27:58 UTC] {subprocess.py:74} INFO - Running command: ['bash', '-c', '/Users/user.name/Coding Projects/Python_Apps/Docker_Applications/Airflow_Docker_Apps/Python_Scripts/API_extract.py']
[2022-05-02, 21:27:58 UTC] {subprocess.py:85} INFO - Output:
[2022-05-02, 21:27:58 UTC] {subprocess.py:89} INFO - bash: /Users/user.name/Coding: No such file or directory
[2022-05-02, 21:27:58 UTC] {subprocess.py:93} INFO - Command exited with return code 127
[2022-05-02, 21:27:58 UTC] {taskinstance.py:1774} ERROR - Task failed with exception
Traceback (most recent call last):
File "/home/airflow/.local/lib/python3.7/site-packages/airflow/operators/bash.py", line 188, in execute
f'Bash command failed. The command returned a non-zero exit code {result.exit_code}.'
airflow.exceptions.AirflowException: Bash command failed. The command returned a non-zero exit code 127.
[2022-05-02, 21:27:58 UTC] {taskinstance.py:1288} INFO - Marking task as UP_FOR_RETRY. dag_id=ETL_script, task_id=extract_data, execution_date=20220502T212756, start_date=20220502T212758, end_date=20220502T212758
[2022-05-02, 21:27:58 UTC] {standard_task_runner.py:98} ERROR - Failed to execute job 134 for task extract_data (Bash command failed. The command returned a non-zero exit code 127.; 148)
[2022-05-02, 21:27:58 UTC] {local_task_job.py:154} INFO - Task exited with return code 1
[2022-05-02, 21:27:58 UTC] {local_task_job.py:264} INFO - 0 downstream tasks scheduled from follow-on schedule check
因此,根据我的理解,第一个任务
t1
失败并给了我这个错误。我想我可能没有正确定义 $PATH
,所以这给了我这个错误。这是我的第一个想法,但我不知道如何纠正。
其他信息
这是我的工作目录:
Docker_Applications
├───dags
│ ├───ETL_script.py
│
│
├───logs
├───plugins
├───Python_Scripts
│ ├───API_extract.py
│ ├───Google_Sheets_Connection.py
│
├───.env
├───docker-compose.yaml
我的
echo $PATH
是
Projects:/Library/Frameworks/Python.framework/Versions/3.10/bin:/usr/local/bin:/usr/bin:/bin:/usr/sbin:/sbin:/Library/Frameworks/Python.framework/Versions/3.10/bin
我的
docker-compose.yaml
文件看起来像:
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#Documentation: https://airflow.apache.org/docs/apache-airflow/stable/start/docker.html#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
# Basic Airflow cluster configuration for CeleryExecutor with Redis and PostgreSQL.
#
# WARNING: This configuration is for local development. Do not use it in a production deployment.
#
# This configuration supports basic configuration using environment variables or an .env file
# The following variables are supported:
#
# AIRFLOW_IMAGE_NAME - Docker image name used to run Airflow.
# Default: apache/airflow:2.2.5
# AIRFLOW_UID - User ID in Airflow containers
# Default: 50000
# Those configurations are useful mostly in case of standalone testing/running Airflow in test/try-out mode
#
# _AIRFLOW_WWW_USER_USERNAME - Username for the administrator account (if requested).
# Default: airflow
# _AIRFLOW_WWW_USER_PASSWORD - Password for the administrator account (if requested).
# Default: airflow
# _PIP_ADDITIONAL_REQUIREMENTS - Additional PIP requirements to add when starting all containers.
# Default: ''
#
# Feel free to modify this file to suit your needs.
---
version: '3.7'
x-airflow-common:
&airflow-common
# In order to add custom dependencies or upgrade provider packages you can use your extended image.
# Comment the image line, place your Dockerfile in the directory where you placed the docker-compose.yaml
# and uncomment the "build" line below, Then run `docker-compose build` to build the images.
image: ${AIRFLOW_IMAGE_NAME:-apache/airflow:2.2.5}
# build: .
environment:
&airflow-common-env
AIRFLOW__CORE__EXECUTOR: CeleryExecutor
AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@postgres/airflow
AIRFLOW__CELERY__BROKER_URL: redis://:@redis:6379/0
AIRFLOW__CORE__FERNET_KEY: ''
AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: 'true'
AIRFLOW__CORE__LOAD_EXAMPLES: 'false'
AIRFLOW__API__AUTH_BACKEND: 'airflow.api.auth.backend.basic_auth'
_PIP_ADDITIONAL_REQUIREMENTS: ${_PIP_ADDITIONAL_REQUIREMENTS:-}
volumes:
- ./dags:/opt/airflow/dags
- ./logs:/opt/airflow/logs
- ./plugins:/opt/airflow/plugins
user: "${AIRFLOW_UID:-50000}:0"
depends_on:
&airflow-common-depends-on
redis:
condition: service_healthy
postgres:
condition: service_healthy
services:
postgres:
image: postgres:13
environment:
POSTGRES_USER: airflow
POSTGRES_PASSWORD: airflow
POSTGRES_DB: airflow
volumes:
- postgres-db-volume:/var/lib/postgresql/data
healthcheck:
test: ["CMD", "pg_isready", "-U", "airflow"]
interval: 5s
retries: 5
restart: always
redis:
image: redis:latest
expose:
- 6379
healthcheck:
test: ["CMD", "redis-cli", "ping"]
interval: 5s
timeout: 30s
retries: 50
restart: always
airflow-webserver:
<<: *airflow-common
command: webserver
ports:
- 8080:8080
healthcheck:
test: ["CMD", "curl", "--fail", "http://localhost:8080/health"]
interval: 10s
timeout: 10s
retries: 5
restart: always
depends_on:
<<: *airflow-common-depends-on
airflow-init:
condition: service_completed_successfully
airflow-scheduler:
<<: *airflow-common
command: scheduler
healthcheck:
test: ["CMD-SHELL", 'airflow jobs check --job-type SchedulerJob --hostname "$${HOSTNAME}"']
interval: 10s
timeout: 10s
retries: 5
restart: always
depends_on:
<<: *airflow-common-depends-on
airflow-init:
condition: service_completed_successfully
airflow-worker:
<<: *airflow-common
command: celery worker
healthcheck:
test:
- "CMD-SHELL"
- 'celery --app airflow.executors.celery_executor.app inspect ping -d "celery@$${HOSTNAME}"'
interval: 10s
timeout: 10s
retries: 5
environment:
<<: *airflow-common-env
# Required to handle warm shutdown of the celery workers properly
# See https://airflow.apache.org/docs/docker-stack/entrypoint.html#signal-propagation
DUMB_INIT_SETSID: "0"
restart: always
depends_on:
<<: *airflow-common-depends-on
airflow-init:
condition: service_completed_successfully
airflow-triggerer:
<<: *airflow-common
command: triggerer
healthcheck:
test: ["CMD-SHELL", 'airflow jobs check --job-type TriggererJob --hostname "$${HOSTNAME}"']
interval: 10s
timeout: 10s
retries: 5
restart: always
depends_on:
<<: *airflow-common-depends-on
airflow-init:
condition: service_completed_successfully
airflow-init:
<<: *airflow-common
entrypoint: /bin/bash
# yamllint disable rule:line-length
command:
- -c
- |
function ver() {
printf "%04d%04d%04d%04d" $${1//./ }
}
airflow_version=$$(gosu airflow airflow version)
airflow_version_comparable=$$(ver $${airflow_version})
min_airflow_version=2.2.0
min_airflow_version_comparable=$$(ver $${min_airflow_version})
if (( airflow_version_comparable < min_airflow_version_comparable )); then
echo
echo -e "\033[1;31mERROR!!!: Too old Airflow version $${airflow_version}!\e[0m"
echo "The minimum Airflow version supported: $${min_airflow_version}. Only use this or higher!"
echo
exit 1
fi
if [[ -z "${AIRFLOW_UID}" ]]; then
echo
echo -e "\033[1;33mWARNING!!!: AIRFLOW_UID not set!\e[0m"
echo "If you are on Linux, you SHOULD follow the instructions below to set "
echo "AIRFLOW_UID environment variable, otherwise files will be owned by root."
echo "For other operating systems you can get rid of the warning with manually created .env file:"
echo " See: https://airflow.apache.org/docs/apache-airflow/stable/start/docker.html#setting-the-right-airflow-user"
echo
fi
one_meg=1048576
mem_available=$$(($$(getconf _PHYS_PAGES) * $$(getconf PAGE_SIZE) / one_meg))
cpus_available=$$(grep -cE 'cpu[0-9]+' /proc/stat)
disk_available=$$(df / | tail -1 | awk '{print $$4}')
warning_resources="false"
if (( mem_available < 4000 )) ; then
echo
echo -e "\033[1;33mWARNING!!!: Not enough memory available for Docker.\e[0m"
echo "At least 4GB of memory required. You have $$(numfmt --to iec $$((mem_available * one_meg)))"
echo
warning_resources="true"
fi
if (( cpus_available < 2 )); then
echo
echo -e "\033[1;33mWARNING!!!: Not enough CPUS available for Docker.\e[0m"
echo "At least 2 CPUs recommended. You have $${cpus_available}"
echo
warning_resources="true"
fi
if (( disk_available < one_meg * 10 )); then
echo
echo -e "\033[1;33mWARNING!!!: Not enough Disk space available for Docker.\e[0m"
echo "At least 10 GBs recommended. You have $$(numfmt --to iec $$((disk_available * 1024 )))"
echo
warning_resources="true"
fi
if [[ $${warning_resources} == "true" ]]; then
echo
echo -e "\033[1;33mWARNING!!!: You have not enough resources to run Airflow (see above)!\e[0m"
echo "Please follow the instructions to increase amount of resources available:"
echo " https://airflow.apache.org/docs/apache-airflow/stable/start/docker.html#before-you-begin"
echo
fi
mkdir -p /sources/logs /sources/dags /sources/plugins
chown -R "${AIRFLOW_UID}:0" /sources/{logs,dags,plugins}
exec /entrypoint airflow version
# yamllint enable rule:line-length
environment:
<<: *airflow-common-env
_AIRFLOW_DB_UPGRADE: 'true'
_AIRFLOW_WWW_USER_CREATE: 'true'
_AIRFLOW_WWW_USER_USERNAME: ${_AIRFLOW_WWW_USER_USERNAME:-airflow}
_AIRFLOW_WWW_USER_PASSWORD: ${_AIRFLOW_WWW_USER_PASSWORD:-airflow}
user: "0:0"
volumes:
- .:/sources
airflow-cli:
<<: *airflow-common
profiles:
- debug
environment:
<<: *airflow-common-env
CONNECTION_CHECK_MAX_COUNT: "0"
# Workaround for entrypoint issue. See: https://github.com/apache/airflow/issues/16252
command:
- bash
- -c
- airflow
flower:
<<: *airflow-common
command: celery flower
ports:
- 5555:5555
healthcheck:
test: ["CMD", "curl", "--fail", "http://localhost:5555/"]
interval: 10s
timeout: 10s
retries: 5
restart: always
depends_on:
<<: *airflow-common-depends-on
airflow-init:
condition: service_completed_successfully
volumes:
postgres-db-volume:
注意:这个
yaml
文件与Airflow网页教程中的文件相同。请提出任何更改或最佳实践,因为我仍在学习!
非常感谢!!!!
正如您在日志中看到的,您在
line 23 ({subprocess.py:74})
API_extract.py
中定位您的 line 25 ({subprocess.py:89})
的气流不同,您可以尝试以下几项操作:
bash_command
中使用双引号而不是单引号,当前脚本在 Coding Projects
的空格中分隔,可能会导致错误:bash_command = "/Users/user.name/Coding Projects/Python_Apps/Docker_Applications/Airflow_Docker_Apps/Python_Scripts/API_extract.py",
调用 python 脚本的更好做法是导入模块并使用任务或使用
PythonOperator
。
将脚本作为模块导入,然后从任务中调用它,而不是从 BashOperator
中调用它,这对于您的情况下的 python 脚本来说是一个很好的做法,要执行此操作,您需要:
在导入标头中包含以下内容:
Import sys
sys.path.append(‘/Users/user.name/Coding Projects/Python_Apps/Docker_Applications/Airflow_Docker_Apps/Python_Scripts/’
import API_extract
import Google_Sheets_Connection
或者,您可以在
docker-compose.yaml
中为您的 PythonScripts
安装一个新卷(我主观上更喜欢此选项,以保持我的模块井井有条),在这种情况下,您通常可以直接从 PythonScripts
导入模块,无需系统:
import API_extract
import Google_Sheets_Connection
请记住将相关的
__init__.py
包含在您需要的目录中。
如果您选择后一个选项,您可以创建新的子目录,但您必须指定气流的路径,以用于您的目录:
├───Python_Scripts
│ ├─── subdir
│ ├─── subdir2
│ ├───API_extract.py
你会这样做:
import subdir.subdir2.API_extract
一旦修复了导入,您就可以重写您的任务:
@task(task_id="extract_data")
def t1():
api_extract.main() #the specific function you need actually
@task(task_id="Insert_into_google_Sheets")
def t2():
api_extract.main()
考虑到 python 脚本的解析可能会产生额外的问题,您应该遵循相同的指令来导入模块并建立路径。 (例如,在我的例子中,只有当我使用调度程序运行任务时才会触发异常,覆盖我的路径并导致类似的错误)。 您可以在这里阅读更多内容:https://airflow.apache.org/docs/apache-airflow/stable/administration-and-deployment/modules_management.html
希望对你有帮助