使用工作流查询 BigQuery 电子表格外部表并复制到云存储

问题描述 投票:0回答:1

我正在研究一个查询 BigQuery 表并将结果直接导出到 Google Cloud Storage 存储桶的工作流程。我有身份验证访问问题,因为我要查询的表是来自 Google 表格的外部表。

已经有关于这个主题的问题here但我无法将其适应我的代码。

到目前为止,这是我的代码:

- init:
    assign:
    - project_id: "my_project_id"
    - bq_dataset_export: "my_dataset"
    - bq_table_export: "my_table"

    - bq_query: >
            select
                Col1
            from
                `my_table`
    - gcs_bucket: "bucket"
    - gcs_filepath: "bucket/file.json"

- bigquery-table-to-gcs:
    call: googleapis.bigquery.v2.jobs.insert
    args:
        projectId: ${project_id}
        body:
            configuration:
                extract:
                    compression: NONE
                    destinationFormat: "NEWLINE_DELIMITED_JSON"
                    destinationUris: ['${"gs://" + gcs_bucket + "/" + gcs_filepath}']
                    sourceTable:
                        projectId: ${project_id}
                        datasetId: ${bq_dataset_export}
                        tableId: ${bq_table_export}

我知道我需要添加这些代码行(或非常接近的东西)但我不知道在哪里:

call: http.post
          args:
            url: ${"https://bigquery.googleapis.com/bigquery/v2/projects/"+project+"/jobs"}
            headers:
              Content-type: "application/json"
            auth:
              type: OAuth2
              scope: ["https://www.googleapis.com/auth/drive","https://www.googleapis.com/auth/cloud-platform","https://www.googleapis.com/auth/bigquery"]

下面的代码返回 404 错误:

- init:
    assign:
    - project_id: "my_project_id"
    - bq_dataset_export: "my_dataset"
    - bq_table_export: "my_table"

    - bq_query: >
            select
                Col1
            from
                `my_table`
    - gcs_bucket: "bucket"
    - gcs_filepath: "bucket/file.json"

- bigquery-table-to-gcs:
    call: http.post
    args:
        url: ${"https://bigquery.googleapis.com/bigquery/v2/projects/"+project_id+"/bq_query"}
        headers:
              Content-type: "application/json"
        auth:
            type: OAuth2
            scope: ["https://www.googleapis.com/auth/drive","https://www.googleapis.com/auth/cloud-platform","https://www.googleapis.com/auth/bigquery"]
        body:
            configuration:
                extract:
                    compression: NONE
                    destinationFormat: "NEWLINE_DELIMITED_JSON"
                    destinationUris: ['${"gs://" + gcs_bucket + "/" + gcs_filepath}']
                    sourceTable:
                        projectId: ${project_id}
                        datasetId: ${bq_dataset_export}
                        tableId: ${bq_table_export}

编辑#1

我有这个工作流程,但可能不是最有效的:

main:
    steps:
        - init:
            assign:
            - project_id: "project"
            - bq_dataset_source: "dataset_source
            - bq_table_source: "table_source"
            - bq_dataset_export: "table_source"
            - bq_table_export: "table_export"

            - bq_query_source: >
                    select
                        *
                    from
                        `hardcoded_table_path`
            
            - bq_query_export: >
                    select
                        *
                    from
                        `hardcoded_table_path`

            - gcs_bucket: "bucket"
            - gcs_filepath: "file_name.json"

        - bigquery-create-export-table:
                        call: BQJobsInsertJobWithSheets
                        args:
                            projectId: ${project_id}
                            configuration:
                                query:
                                    query: ${bq_query_source}
                                    destinationTable:
                                        projectId: ${project_id}
                                        datasetId: ${bq_dataset_export}
                                        tableId: ${bq_table_export}
                                    create_disposition: "CREATE_IF_NEEDED"
                                    write_disposition: "WRITE_TRUNCATE"
                                    allowLargeResults: true
                                    useLegacySql: false
        
        - bigquery-read-export-table:
            call: googleapis.bigquery.v2.jobs.insert
            args:
                projectId: ${project_id}
                body:
                    configuration:
                        query:
                            query: ${bq_query_export}
                            destinationTable:
                                projectId: ${project_id}
                                datasetId: ${bq_dataset_export}
                                tableId: ${bq_table_export}
                            create_disposition: "CREATE_IF_NEEDED"
                            write_disposition: "WRITE_TRUNCATE"
                            allowLargeResults: true
                            useLegacySql: false

        - bigquery-table-to-gcs:
            call: googleapis.bigquery.v2.jobs.insert
            args:
                projectId: ${project_id}
                body:
                    configuration:
                        extract:
                            compression: NONE
                            destinationFormat: "NEWLINE_DELIMITED_JSON"
                            destinationUris: ['${"gs://" + gcs_bucket + "/" + gcs_filepath}']
                            sourceTable:
                                projectId: ${project_id}
                                datasetId: ${bq_dataset_export}
                                tableId: ${bq_table_export}


#subworkflow definitions
BQJobsInsertJobWithSheets:
  params: [projectId, configuration]
  steps:
    - runJob:
        try:
          call: http.post
          args:
            url: ${"https://bigquery.googleapis.com/bigquery/v2/projects/"+projectId+"/jobs"}
            headers:
              Content-type: "application/json"
            auth:
              type: OAuth2
              scope: ["https://www.googleapis.com/auth/drive","https://www.googleapis.com/auth/cloud-platform","https://www.googleapis.com/auth/bigquery"]
            body:
              configuration: ${configuration}
          result: queryResult
        except:
          as: e
          steps:
            - UnhandledException:
                raise: ${e}
        next: queryCompleted
    - pageNotFound:
        return: "Page not found."
    - authError:
        return: "Authentication error."
    - queryCompleted:
        return: ${queryResult.body}
authentication google-bigquery google-drive-api google-workflows
1个回答
0
投票

选项 1,http.post + googleapis.bigquery.v2.jobs.query

    call: http.post
    args:
        url: ${"https://bigquery.googleapis.com/bigquery/v2/projects/"+project+"/queries"}
        headers:
            Content-type: "application/json"
        auth:
            type: OAuth2
            scope: ["https://www.googleapis.com/auth/drive","https://www.googleapis.com/auth/cloud-platform","https://www.googleapis.com/auth/bigquery"]
        body:
            query: select * from sheets.sheets_data
            timeoutMs: 200000
            useLegacySql: false
    result: response

选项 2,计划查询 + googleapis.bigquerydatatransfer.v1

    call: googleapis.bigquerydatatransfer.v1.projects.locations.transferConfigs.startManualRuns
    args:
      parent: ${scheduled_query_name}
      body:
        requestedRunTime: ${time.format(sys.now())}
    result: response
© www.soinside.com 2019 - 2024. All rights reserved.