在 Airflow 中组织 DAG 标签

问题描述 投票:0回答:1

我有两个 DAG,example1 和 example2。 example1 的标签为 ['toy', 'umbrella'],example1 的标签为 ['toy', 'ball']。在气流 UI 中,对于 example1,标签显示为

toy
umbrella
,对于 example2,标签显示为
ball
toy
。如何将标签显示为 example2 为
toy
ball
.

import os
import logging
from datetime import datetime

import yaml

from airflow import DAG
from airflow.decorators import task

from lib.utils.bigquery import load_json_into_table, full_table_name
from lib.utils.cloud_storage import load_config, read_files_in_bucket
from lib.utils.slack import create_notification_operators
from airflow.models.param import Param

# Default parameters for the task
ENV = os.environ.get("ENV")
DAG_ID = f"toy_data__ingestion__{ENV}"
PROJECT_ID = f"cat-{ENV}-toy"

# Defining the bucket to read the configuration files
CONFIG_BUCKET = f"cat-{ENV}-toy/products/ref_data"
CONFIG_FILENAME = "toy_ref_data.yaml"

# Load Config
CONFIG = load_config(bucket_path=CONFIG_BUCKET, filename=CONFIG_FILENAME)


logging.info(f"##### The overall config loaded for {DAG_ID} has been loaded")


REGION = CONFIG["region"]
DATASET = CONFIG["dataset_id"]

default_dag_args = {
    "owner": "fancybear",
    "retries": 1,
    "project_id": PROJECT_ID,
    "region": REGION,
}


with DAG(
    dag_id=DAG_ID,
    start_date=datetime(2023, 2, 12),
    max_active_runs=1,
    schedule=None,
    default_args=default_dag_args,
    params={
        "tables_to_ingest": Param(
            [], 
            type="array",
        ),
    },
    catchup=False,
    tags=["toy", "ball"],
) as trigger_dag:
    @task
    def load_data(**context):
        file_names = [
            f"{file_name}.yaml" for file_name in context["params"]["tables_to_ingest"]
        ]
        TABLE_CONFIGS = [
            yaml.safe_load(
                read_files_in_bucket(
                    bucket_path=(CONFIG_BUCKET + "/schemas"), matching_filename=file
                )[0]
            )
            for file in file_names
        ]
        for config in TABLE_CONFIGS:
            logging.info(
                f"####### Loading data into table {config['table_id']} #######"
            )
            config[
                "raw_file_path"
            ] = f"gs://{CONFIG_BUCKET}/data/{config['raw_file_name']}"

            load_json_into_table(
                config=config,
                table_id=full_table_name(PROJECT_ID, DATASET, config["table_id"]),
                region=REGION,
            )

    notify_operators = create_notification_operators(DAG_ID)

    load_data() >> notify_operators
python airflow airflow-2.x
1个回答
0
投票

所以,我不确定您如何使用您共享的代码创建多个 DAG,但我假设您也希望基于

DAG_ID
变量在标签中包含
ENV
,因此动态更新列表并添加
ENV 
因为标签应该可以解决问题,或者至少会给你一个良好的开端

ENV='ball'
tags=["toy", ENV]
© www.soinside.com 2019 - 2024. All rights reserved.