在bigquery中是否可以使用动态数据集名称

问题描述 投票:0回答:1

问题陈述:

我正在尝试在气流中使用BigqueryOperator。目的是通过动态更改数据集名称多次读取相同的查询,即数据集名称将作为参数传递。

示例:project.dataset1_layer1.tablename1project.dataset2_layer1.tablename1

期望:我想维护一个SQL副本,在其中我可以将数据集名称作为参数传递给该数据集,以代替该特定数据集。

错误消息:

我尝试将动态数据集名称作为query_params的一部分传递。但它失败,并显示以下错误消息。

查询被解析为INFO - Executing: [u'SELECT col1, col2 FROM project.@partner_layer1.tablename']

ERROR - BigQuery job failed. Final error was: {u'reason': u'invalidQuery', u'message': u'Query parameters cannot be used in place of table names at [1:37]', u'location': u'query'}. u'CREATE_IF_NEEDED', u'query': u'SELECT col1, col2 FROM project.@partner_layer1.tablename'}, u'jobType': u'QUERY'}}`

到目前为止我尝试过的事情

查询模板temp.sql如下:

SELECT col1, col2 FROM `project.@partner_layer1.tablename`;

Airflow BigqueryOperator的用法如下:

query_template_dict = {
    'partner_list' = ['val1', 'val2', 'val3', 'val4']
    'google_project': 'project_name',
    'queries': {
        'layer3': {
            'template':             'temp.sql',
            'output_dataset':       '_layer3',
            'output_tbl':           'table_{}'.format(table_date),
            'output_tbl_schema':    'temp.txt'
        }
    },
    'applicable_tasks': {
        'val1': {
            'table_layer3': []
        },
        'val2': {
            'table_layer3': []
        },
        'val3': {
            'table_layer3': []
        },
        'val4': {
            'table_layer3': []
        }

    }
}


for partner in query_template_dict['partner_list']:
    # Loop over applicable report queries for a partner
    applicable_tasks = query_template_dict['applicable_tasks'][partner].keys()
    for task in applicable_tasks:
        destination_tbl = '{}.{}{}.{}'.format(query_template_dict['google_project'], partner,
                                              query_template_dict['queries'][task]['output_dataset'] , 
                                              query_template_dict['queries'][task]['output_tbl'])
                                              }
        #Actual destination table structure
        #destination_tbl = 'project.partner_layer3.table_20200223'  
run_bq_cmd = BigQueryOperator (
                        task_id                                 =partner + '-' + task,
                        sql                                     =[query_template_dict['queries'][task]['template']],
                        destination_dataset_table               =destination_tbl,
                        use_legacy_sql                          =False,
                        write_disposition                       ='WRITE_APPEND',
                        create_disposition                      ='CREATE_IF_NEEDED',
                        allow_large_results                     =True,
                        query_params=[
                                {
                                        "name":                 "partner",
                                        "parameterType":        { "type": "STRING" },
                                        "parameterValue":       { "value": partner}
                                },

                                {
                                         "name":             "batch_date",
                                         "parameterType":    { "type": "STRING" },
                                         "parameterValue":   { "value": batch_date}
                                }
                        ],
                        dag=dag,

有人可以帮我解决这个问题吗?BigQuery是否有动态传递数据集名称的限制?

google-bigquery airflow
1个回答
0
投票

在Airflow中而不是BigQuery中替换数据集名称。

在将查询发送到BigQuery之前执行此操作-在Airflow中使用Python字符串替换。

© www.soinside.com 2019 - 2024. All rights reserved.