问题陈述:
我正在尝试在气流中使用BigqueryOperator。目的是通过动态更改数据集名称多次读取相同的查询,即数据集名称将作为参数传递。
示例:project.dataset1_layer1.tablename1
,project.dataset2_layer1.tablename1
期望:我想维护一个SQL副本,在其中我可以将数据集名称作为参数传递给该数据集,以代替该特定数据集。
错误消息:
我尝试将动态数据集名称作为query_params的一部分传递。但它失败,并显示以下错误消息。
查询被解析为INFO - Executing: [u'SELECT col1, col2 FROM project.@partner_layer1.tablename']
ERROR - BigQuery job failed. Final error was: {u'reason': u'invalidQuery', u'message': u'Query parameters cannot be used in place of table names at [1:37]', u'location': u'query'}. u'CREATE_IF_NEEDED', u'query': u'SELECT col1, col2 FROM project.@partner_layer1.tablename'}, u'jobType': u'QUERY'}}
`
到目前为止我尝试过的事情
查询模板temp.sql
如下:
SELECT col1, col2 FROM `project.@partner_layer1.tablename`;
Airflow BigqueryOperator的用法如下:
query_template_dict = {
'partner_list' = ['val1', 'val2', 'val3', 'val4']
'google_project': 'project_name',
'queries': {
'layer3': {
'template': 'temp.sql',
'output_dataset': '_layer3',
'output_tbl': 'table_{}'.format(table_date),
'output_tbl_schema': 'temp.txt'
}
},
'applicable_tasks': {
'val1': {
'table_layer3': []
},
'val2': {
'table_layer3': []
},
'val3': {
'table_layer3': []
},
'val4': {
'table_layer3': []
}
}
}
for partner in query_template_dict['partner_list']:
# Loop over applicable report queries for a partner
applicable_tasks = query_template_dict['applicable_tasks'][partner].keys()
for task in applicable_tasks:
destination_tbl = '{}.{}{}.{}'.format(query_template_dict['google_project'], partner,
query_template_dict['queries'][task]['output_dataset'] ,
query_template_dict['queries'][task]['output_tbl'])
}
#Actual destination table structure
#destination_tbl = 'project.partner_layer3.table_20200223'
run_bq_cmd = BigQueryOperator (
task_id =partner + '-' + task,
sql =[query_template_dict['queries'][task]['template']],
destination_dataset_table =destination_tbl,
use_legacy_sql =False,
write_disposition ='WRITE_APPEND',
create_disposition ='CREATE_IF_NEEDED',
allow_large_results =True,
query_params=[
{
"name": "partner",
"parameterType": { "type": "STRING" },
"parameterValue": { "value": partner}
},
{
"name": "batch_date",
"parameterType": { "type": "STRING" },
"parameterValue": { "value": batch_date}
}
],
dag=dag,
有人可以帮我解决这个问题吗?BigQuery是否有动态传递数据集名称的限制?
在Airflow中而不是BigQuery中替换数据集名称。
在将查询发送到BigQuery之前执行此操作-在Airflow中使用Python字符串替换。