如何动态更改 Airflow DAG 中的存储桶名称?

问题描述 投票:0回答:1

我一直在使用 Airflow 成功地从存储桶中读取 SQL 文件并将其传递给

BigQueryInsertJobOperator
。这是一个展示我当前工作设置的最小示例:

with DAG(
    "my_dag",
    blah_args,
) as dag:

def get_query(file_name):
    with open("/home/airflow/gcs/data/my_sql_file_bucket/" + file_name) as f:
        f_query = f.read()
        return f_query

op1 = BigQueryInsertJobOperator(
    blah_args=blah_args,
    configuration={
        "query":{
            "query": get_query("my_sql_file.sql"),
            blah_all_the_rest
        },
    }
    )

希望您明白这个想法,这非常有效!但现在我需要根据变量动态更改我读取的存储桶,但我不知道如何让它工作:

def get_query(file_name):
    with open("/home/airflow/gcs/data/{{ var.value.my_dynamic_bucket }}" + file_name) as f:
        f_query = f.read()
        return f_query

不渲染,我明白为什么,它需要在任务中。然后我尝试了:

op1 = BigQueryInsertJobOperator(
    blah_args=blah_args,
    configuration={
        "query":{
            "query": open({{  var.value.my_dynamic_bucket  }}).read(),
            blah_all_the_rest
        },
    }
    )

但我想这和以前的错误是一样的。我什至尝试将

template_searchpath
设置为在
{{ var.value.my_dynamic_bucket  }}
中具有
default_args
的值,但它也不会在这里渲染。

如何使用 Airflow 变量来确定我需要从中读取 SQL 文件的存储桶的名称?预先感谢您。

airflow jinja2 google-cloud-composer
1个回答
0
投票

如果您的 DAG 可以使用全局范围变量,那么您可以尝试使用气流变量 https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/variables.html

定义一个变量,例如具有相关值的bucket_name。

from airflow.models import Variable


# inside your function 
Variable.get("bucket_name")
© www.soinside.com 2019 - 2024. All rights reserved.