我有两个 DAG,example1 和 example2。 example1 的标签为 ['toy', 'umbrella'],example1 的标签为 ['toy', 'ball']。在气流 UI 中,对于 example1,标签显示为
toy
umbrella
,对于 example2,标签显示为 ball
toy
。如何将标签显示为 example2 为 toy
ball
.
import os
import logging
from datetime import datetime
import yaml
from airflow import DAG
from airflow.decorators import task
from lib.utils.bigquery import load_json_into_table, full_table_name
from lib.utils.cloud_storage import load_config, read_files_in_bucket
from lib.utils.slack import create_notification_operators
from airflow.models.param import Param
# Default parameters for the task
ENV = os.environ.get("ENV")
DAG_ID = f"toy_data__ingestion__{ENV}"
PROJECT_ID = f"cat-{ENV}-toy"
# Defining the bucket to read the configuration files
CONFIG_BUCKET = f"cat-{ENV}-toy/products/ref_data"
CONFIG_FILENAME = "toy_ref_data.yaml"
# Load Config
CONFIG = load_config(bucket_path=CONFIG_BUCKET, filename=CONFIG_FILENAME)
logging.info(f"##### The overall config loaded for {DAG_ID} has been loaded")
REGION = CONFIG["region"]
DATASET = CONFIG["dataset_id"]
default_dag_args = {
"owner": "fancybear",
"retries": 1,
"project_id": PROJECT_ID,
"region": REGION,
}
with DAG(
dag_id=DAG_ID,
start_date=datetime(2023, 2, 12),
max_active_runs=1,
schedule=None,
default_args=default_dag_args,
params={
"tables_to_ingest": Param(
[],
type="array",
),
},
catchup=False,
tags=["toy", "ball"],
) as trigger_dag:
@task
def load_data(**context):
file_names = [
f"{file_name}.yaml" for file_name in context["params"]["tables_to_ingest"]
]
TABLE_CONFIGS = [
yaml.safe_load(
read_files_in_bucket(
bucket_path=(CONFIG_BUCKET + "/schemas"), matching_filename=file
)[0]
)
for file in file_names
]
for config in TABLE_CONFIGS:
logging.info(
f"####### Loading data into table {config['table_id']} #######"
)
config[
"raw_file_path"
] = f"gs://{CONFIG_BUCKET}/data/{config['raw_file_name']}"
load_json_into_table(
config=config,
table_id=full_table_name(PROJECT_ID, DATASET, config["table_id"]),
region=REGION,
)
notify_operators = create_notification_operators(DAG_ID)
load_data() >> notify_operators
所以,我不确定您如何使用您共享的代码创建多个 DAG,但我假设您也希望基于
DAG_ID
变量在标签中包含 ENV
,因此动态更新列表并添加 ENV
因为标签应该可以解决问题,或者至少会给你一个良好的开端
ENV='ball'
tags=["toy", ENV]