我使用 CLI 创建了一个自定义包(使用 Click 构建)。该包可以做两件事:运行预处理和运行机器学习模型。 我创建了该客户包的 Docker 映像,并将其推送到 AWS (ECR) 上的私有注册表。
现在我想使用 Airflow 运行这个容器,我想在 EC2 实例上运行它。我正在使用 docker-compose 运行它。
对于这个示例,我将只关注一项任务:运行容器进行预处理。
但是,现在我得到 t2 的“上游失败”。
from datetime import timedelta
import airflow
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.docker_operator import DockerOperator
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': airflow.utils.dates.days_ago(1),
'email': ['[email protected]'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5)
}
dag = DAG(
'example_pipeline',
default_args=default_args,
description='example data pipeline.',
schedule_interval=timedelta(minutes=3)
)
t1 = BashOperator(
task_id='starting_airflow',
bash_command='echo "Starting Airflow DAG..."',
dag=dag,
)
t2 = DockerOperator(
task_id='data_pipeline',
image='XXXXXXXX.dkr.ecr.eu-central-1.amazonaws.com/rwg:latest',
container_name='task__export_data',
command="run-preprocessing",
network_mode="bridge",
api_version="auto",
docker_url="unix://var/run/docker.sock",
docker_conn_id='aws_con',
dag=dag
)
t1 >> t2
我通过 UI 创建了“aws_con”。但好像没有效果。
此外,这是我的 docker-compose.yml 文件。
version: '3'
services:
postgres:
image: postgres:9.6
environment:
- POSTGRES_USER=airflow
- POSTGRES_PASSWORD=airflow
- POSTGRES_DB=airflow
ports:
- "5432:5432"
volumes:
- ./pgdata:/var/lib/postgresql/data
webserver:
build: .
restart: always
depends_on:
- postgres
environment:
- LOAD_EX=n
- EXECUTOR=Local
- FERNET_KEY=jsDPRErfv8Z_eVTnGfF8ywd19j4pyqE3NpdUBA_oRTo=
volumes:
- ./dags:/usr/local/airflow/dags
ports:
- "8080:8080"
command: webserver
healthcheck:
test: ["CMD-SHELL", "[ -f /usr/local/airflow/airflow-webserver.pid ]"]
interval: 30s
timeout: 30s
retries: 3
我在 Docker Operator 方面做错了什么?
第二个问题:如何通过代码或 cli 创建这个“aws_con”?
您应该指定 docker 连接。
这是在 Airflow 中传递凭证的标准方法,并且 Docker 有一个专用的 Docker 连接,您可以在 Airlfow DB 或 Secrets 中定义该连接,并将连接的 id 作为
docker_conn_id
- 参数传递给 DockerOperator(您也可以指定url 在那里,所以你不需要在你的操作符中传递 docker_url)。
请参阅此处的 Python API:
这里还有关于 Docker 连接的单独页面(
docker_conn_id
描述链接到此页面):
此问题的前一个answer指出创建 Docker 连接,这是可行的方法,但忽略了 AWS ECR 的凭证是临时的这一点。
解决方案是创建一个 DAG,它将更新
docker_conn_id
的 ECR 凭证,然后 docker_conn_id
可以在 DockerOperator 中使用。
import base64
from datetime import datetime
from airflow import DAG, settings
from airflow.models import Connection
from airflow.operators.python import PythonOperator
from airflow.providers.amazon.aws.hooks.ecr import EcrHook
def update_credentials() -> None:
"""Retrieves ECR access token and updates Airflow docker_ecr connection"""
hook = EcrHook(aws_conn_id="aws_default")
conn_id = "docker_ecr"
response = hook.get_client_type().get_authorization_token()
username, password = (
base64.b64decode(response["authorizationData"][0]["authorizationToken"])
.decode("utf-8")
.split(":")
)
registry_url = response["authorizationData"][0]["proxyEndpoint"]
conn = Connection(
conn_id=conn_id,
conn_type="docker",
host=registry_url,
login=username,
password=password,
)
with settings.Session() as session:
session.query(Connection).filter(Connection.conn_id == conn_id).delete()
session.add(conn)
session.commit()
default_args = {
"owner": "airflow",
"description": "Fetches and stores ECR credentials to allow Docker daemon to pull images",
"depend_on_past": False,
"start_date": datetime(2024, 5, 16),
"email_on_failure": False,
"email_on_retry": False,
}
with DAG(
"update_ecr_credentials",
default_args=default_args,
schedule_interval="@hourly",
catchup=False,
) as dag:
update_credentials_task = PythonOperator(
task_id="update_credentials", python_callable=update_credentials
)
运行 DAG 后,您可以在 DockerOperator 参数中设置
docker_conn_id="docker_ecr"
- source。
对于您的 Airflow 实例,您需要在 AWS 上获得以下权限才能获取图像:
ecr:BatchCheckLayerAvailability
、ecr:BatchGetImage, ecr:GetAuthorizationToken
和 ecr:GetDownloadUrlForLayer
。
我正在使用来自
apache-airflow-providers-amazon的
EcrHook
,如果您不想使用它,只需将行 response = hook.get_client_type().get_authorization_token()
更改为 response = boto3.client("ecr").get_authorization_token()
。但随后您需要自行管理 AWS 凭证。