如何通过 Airflow 的 DockerOperator 使用来自 AWS ECR 的私有映像?

问题描述 投票:0回答:2

我使用 CLI 创建了一个自定义包(使用 Click 构建)。该包可以做两件事:运行预处理和运行机器学习模型。 我创建了该客户包的 Docker 映像,并将其推送到 AWS (ECR) 上的私有注册表。

现在我想使用 Airflow 运行这个容器,我想在 EC2 实例上运行它。我正在使用 docker-compose 运行它。

对于这个示例,我将只关注一项任务:运行容器进行预处理。

但是,现在我得到 t2 的“上游失败”。

from datetime import timedelta
import airflow
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.docker_operator import DockerOperator


default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': airflow.utils.dates.days_ago(1),
    'email': ['[email protected]'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5)
}

dag = DAG(
    'example_pipeline',
    default_args=default_args,
    description='example data pipeline.',
    schedule_interval=timedelta(minutes=3)
)


t1 = BashOperator(
    task_id='starting_airflow',
    bash_command='echo "Starting Airflow DAG..."',
    dag=dag,
)


t2 = DockerOperator(
    task_id='data_pipeline',
    image='XXXXXXXX.dkr.ecr.eu-central-1.amazonaws.com/rwg:latest',
    container_name='task__export_data',
    command="run-preprocessing",
    network_mode="bridge",
    api_version="auto",
    docker_url="unix://var/run/docker.sock",
    docker_conn_id='aws_con',
    dag=dag
)

t1 >> t2

我通过 UI 创建了“aws_con”。但好像没有效果。

此外,这是我的 docker-compose.yml 文件。

version: '3'
services:
  postgres:
    image: postgres:9.6
    environment:
      - POSTGRES_USER=airflow
      - POSTGRES_PASSWORD=airflow
      - POSTGRES_DB=airflow
    ports:
      - "5432:5432"
    volumes:
      - ./pgdata:/var/lib/postgresql/data

  webserver:
    build: .
    restart: always
    depends_on:
      - postgres
    environment:
      - LOAD_EX=n
      - EXECUTOR=Local
      - FERNET_KEY=jsDPRErfv8Z_eVTnGfF8ywd19j4pyqE3NpdUBA_oRTo=
    volumes:
      - ./dags:/usr/local/airflow/dags
    ports:
      - "8080:8080"
    command: webserver
    healthcheck:
      test: ["CMD-SHELL", "[ -f /usr/local/airflow/airflow-webserver.pid ]"]
      interval: 30s
      timeout: 30s
      retries: 3

我在 Docker Operator 方面做错了什么?

第二个问题:如何通过代码或 cli 创建这个“aws_con”?

python amazon-web-services docker airflow amazon-ecr
2个回答
2
投票

您应该指定 docker 连接。

这是在 Airflow 中传递凭证的标准方法,并且 Docker 有一个专用的 Docker 连接,您可以在 Airlfow DB 或 Secrets 中定义该连接,并将连接的 id 作为

docker_conn_id
- 参数传递给 DockerOperator(您也可以指定url 在那里,所以你不需要在你的操作符中传递 docker_url)。

请参阅此处的 Python API:

https://airflow.apache.org/docs/apache-airflow-providers-docker/stable/_api/airflow/providers/docker/operators/docker/index.html

这里还有关于 Docker 连接的单独页面(

docker_conn_id
描述链接到此页面):

https://airflow.apache.org/docs/apache-airflow-providers-docker/stable/connections/docker.html#howto-connection-docker


0
投票

此问题的前一个answer指出创建 Docker 连接,这是可行的方法,但忽略了 AWS ECR 的凭证是临时的这一点。

解决方案是创建一个 DAG,它将更新

docker_conn_id
的 ECR 凭证,然后
docker_conn_id
可以在 DockerOperator 中使用。

我的来源是博客文章GitHub问题

import base64
from datetime import datetime

from airflow import DAG, settings
from airflow.models import Connection
from airflow.operators.python import PythonOperator
from airflow.providers.amazon.aws.hooks.ecr import EcrHook


def update_credentials() -> None:
    """Retrieves ECR access token and updates Airflow docker_ecr connection"""
    hook = EcrHook(aws_conn_id="aws_default")
    conn_id = "docker_ecr"

    response = hook.get_client_type().get_authorization_token()

    username, password = (
        base64.b64decode(response["authorizationData"][0]["authorizationToken"])
        .decode("utf-8")
        .split(":")
    )

    registry_url = response["authorizationData"][0]["proxyEndpoint"]

    conn = Connection(
        conn_id=conn_id,
        conn_type="docker",
        host=registry_url,
        login=username,
        password=password,
    )
    with settings.Session() as session:
        session.query(Connection).filter(Connection.conn_id == conn_id).delete()
        session.add(conn)
        session.commit()


default_args = {
    "owner": "airflow",
    "description": "Fetches and stores ECR credentials to allow Docker daemon to pull images",
    "depend_on_past": False,
    "start_date": datetime(2024, 5, 16),
    "email_on_failure": False,
    "email_on_retry": False,
}

with DAG(
    "update_ecr_credentials",
    default_args=default_args,
    schedule_interval="@hourly",
    catchup=False,
) as dag:
    update_credentials_task = PythonOperator(
        task_id="update_credentials", python_callable=update_credentials
    )

运行 DAG 后,您可以在 DockerOperator 参数中设置

docker_conn_id="docker_ecr"
- source

对于您的 Airflow 实例,您需要在 AWS 上获得以下权限才能获取图像:

ecr:BatchCheckLayerAvailability
ecr:BatchGetImage, ecr:GetAuthorizationToken
ecr:GetDownloadUrlForLayer

我正在使用来自

apache-airflow-providers-amazon
EcrHook,如果您不想使用它,只需将行
response = hook.get_client_type().get_authorization_token()
更改为
response = boto3.client("ecr").get_authorization_token()
。但随后您需要自行管理 AWS 凭证。

© www.soinside.com 2019 - 2024. All rights reserved.