我一直在使用 Airflow 成功地从存储桶中读取 SQL 文件并将其传递给
BigQueryInsertJobOperator
。这是一个展示我当前工作设置的最小示例:
with DAG(
"my_dag",
blah_args,
) as dag:
def get_query(file_name):
with open("/home/airflow/gcs/data/my_sql_file_bucket/" + file_name) as f:
f_query = f.read()
return f_query
op1 = BigQueryInsertJobOperator(
blah_args=blah_args,
configuration={
"query":{
"query": get_query("my_sql_file.sql"),
blah_all_the_rest
},
}
)
希望您明白这个想法,这非常有效!但现在我需要根据变量动态更改我读取的存储桶,但我不知道如何让它工作:
def get_query(file_name):
with open("/home/airflow/gcs/data/{{ var.value.my_dynamic_bucket }}" + file_name) as f:
f_query = f.read()
return f_query
不渲染,我明白为什么,它需要在任务中。然后我尝试了:
op1 = BigQueryInsertJobOperator(
blah_args=blah_args,
configuration={
"query":{
"query": open({{ var.value.my_dynamic_bucket }}).read(),
blah_all_the_rest
},
}
)
但我想这和以前的错误是一样的。我什至尝试将
template_searchpath
设置为在 {{ var.value.my_dynamic_bucket }}
中具有 default_args
的值,但它也不会在这里渲染。
如何使用 Airflow 变量来确定我需要从中读取 SQL 文件的存储桶的名称?预先感谢您。
如果您的 DAG 可以使用全局范围变量,那么您可以尝试使用气流变量 https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/variables.html
定义一个变量,例如具有相关值的bucket_name。
from airflow.models import Variable
# inside your function
Variable.get("bucket_name")