如何使用 pageToken 和适用于 Java 的 Google 客户端库来请求分页 BigQuery 查询结果?

问题描述 投票:0回答:2

我想运行包含数千行总结果的 BigQuery 查询,但我只想一次检索包含 100 个结果的页面(使用

maxResults
pageToken
参数)。

BigQuery API 支持在

pageToken
方法上使用
collection.list
参数。但是,我正在运行异步查询并使用
getQueryResult
方法检索结果,并且它似乎不支持
pageToken
参数。是否可以将
pageToken
getQueryResults
一起使用?

google-bigquery google-api-java-client
2个回答
14
投票

更新:有关于如何在此处翻阅列表结果的新文档

我自我回答这个问题,因为有开发者私下问过我这个问题,我想在 Stack Overflow 上分享答案。

从 Tabledata.list 方法请求分页结果时可以使用 pageToken 参数。例如,当结果数据超过 100k 行或 10 MB 结果时,结果集会自动分页。您还可以通过显式设置 maxResults 参数来请求结果分页。每页结果都会返回一个 pageToken 参数,然后可以使用该参数检索下一页结果。

每个查询都会生成一个新的 BigQuery 表。如果您没有显式命名该表,它只会持续 24 小时。然而,即使是未命名的“匿名”表也有一个标识符。无论哪种情况,插入查询作业后,检索新创建的表的名称。然后使用 tabledata.list 方法(以及 maxResults/pageToken 参数的组合)以分页形式请求结果。使用之前检索到的 pageToken 循环并继续调用 tabledata.list,直到不再返回 pageToken(意味着您已到达最后一页。

使用适用于 Java 的 Google API 客户端库,插入查询作业、轮询查询完成情况,然后检索一页又一页的查询结果的代码可能如下所示:

// Create a new BigQuery client authorized via OAuth 2.0 protocol
// See: https://developers.google.com/bigquery/docs/authorization#installed-applications
Bigquery bigquery = createAuthorizedClient();

// Start a Query Job
String querySql = "SELECT TOP(word, 500), COUNT(*) FROM publicdata:samples.shakespeare";
JobReference jobId = startQuery(bigquery, PROJECT_ID, querySql);

// Poll for Query Results, return result output
TableReference completedJob = checkQueryResults(bigquery, PROJECT_ID, jobId);

// Return and display the results of the Query Job
displayQueryResults(bigquery, completedJob);

/**
 * Inserts a Query Job for a particular query
 */
public static JobReference startQuery(Bigquery bigquery, String projectId,
                                      String querySql) throws IOException {
  System.out.format("\nInserting Query Job: %s\n", querySql);

  Job job = new Job();
  JobConfiguration config = new JobConfiguration();
  JobConfigurationQuery queryConfig = new JobConfigurationQuery();
  config.setQuery(queryConfig);

  job.setConfiguration(config);
  queryConfig.setQuery(querySql);

  Insert insert = bigquery.jobs().insert(projectId, job);
  insert.setProjectId(projectId);
  JobReference jobId = insert.execute().getJobReference();

  System.out.format("\nJob ID of Query Job is: %s\n", jobId.getJobId());

  return jobId;
}

/**
 * Polls the status of a BigQuery job, returns TableReference to results if "DONE"
 */
private static TableReference checkQueryResults(Bigquery bigquery, String projectId, JobReference jobId)
    throws IOException, InterruptedException {
  // Variables to keep track of total query time
  long startTime = System.currentTimeMillis();
  long elapsedTime;

  while (true) {
    Job pollJob = bigquery.jobs().get(projectId, jobId.getJobId()).execute();
    elapsedTime = System.currentTimeMillis() - startTime;
    System.out.format("Job status (%dms) %s: %s\n", elapsedTime,
        jobId.getJobId(), pollJob.getStatus().getState());
    if (pollJob.getStatus().getState().equals("DONE")) {
      return pollJob.getConfiguration().getQuery().getDestinationTable();
    }
    // Pause execution for one second before polling job status again, to
    // reduce unnecessary calls to the BigQUery API and lower overall
    // application bandwidth.
    Thread.sleep(1000);
  }
}

/**
 * Page through the result set
 */
private static void displayQueryResults(Bigquery bigquery,
                                        TableReference completedJob) throws IOException {

    long maxResults = 20;
    String pageToken = null;
    int page = 1;

  // Default to not looping
    boolean moreResults = false;

    do {
    TableDataList queryResult = bigquery.tabledata().list(
            completedJob.getProjectId(),
            completedJob.getDatasetId(),
            completedJob.getTableId())
                .setMaxResults(maxResults)
                .setPageToken(pageToken)
         .execute();
    List<TableRow> rows = queryResult.getRows();
    System.out.print("\nQuery Results, Page #" + page + ":\n------------\n");
    for (TableRow row : rows) {
      for (TableCell field : row.getF()) {
      System.out.printf("%-50s", field.getV());
       }
      System.out.println();
    }
    if (queryResult.getPageToken() != null) {
      pageToken = queryResult.getPageToken();
      moreResults = true;
      page++;
    } else {
      moreResults = false;
    }
  } while (moreResults);
}

0
投票

对于在 2024 年偶然发现此问题并使用 Java 客户端的任何人来说,有一种更短的方法来管理将结果导出到匿名临时表(即

QueryJobConfiguration.setDestinationTable
中的目标表),但它也需要查询与作业一起使用,而不是仅使用其中一种
BigQuery.query
方法

这是常规代码 - 您需要在编译单元中的某个位置有一个名为

BigQuery
bigquery
服务对象...

   /**
     * Uses a Job to query so we can access the anonymous Destination Table that is where the results
     * are cached.  This enables walking through paginated results in the service layer
     * @param query sql query
     * @param limit max results per page
     * @return the result of the query, using the anonymous Destination Table as the source
     */
    protected TableResult _queryPaginate(String query, Integer limit){
        // Use standard SQL syntax for queries.
        // See: https://cloud.google.com/bigquery/sql-reference/
        QueryJobConfiguration queryConfig =
                QueryJobConfiguration.newBuilder(query)
                        .setUseLegacySql(false)
                        .build()
        // Create a job ID so that we can safely retry.
        JobId jobId = JobId.of(UUID.randomUUID().toString());
        Job queryJob = bigQuery.create(JobInfo.newBuilder(queryConfig).setJobId(jobId).build())

        // Wait for the query to complete.
        queryJob = queryJob.waitFor()
        log.info "queryJob complete:${queryJob.getJobId()}"
        // Check for errors
        if (queryJob == null) {
            throw new RuntimeException("Job no longer exists")
        } else if (queryJob.getStatus().getError() != null) {
            // You can also look at queryJob.getStatus().getExecutionErrors() for all
            // errors, not just the latest one.
            throw new RuntimeException(queryJob.getStatus().getError().toString())
        }
        def queryJobConfig = queryJob.getConfiguration() as QueryJobConfiguration
        def resultsTable = queryJobConfig.getDestinationTable()

        log.info "list from resultsTable:$resultsTable"
        //first get the results so we can get the schema
        def results = queryJob.getQueryResults()
        //then list the results from the anonymous table for pagination
        bigQuery.listTableData(resultsTable,results.getSchema(), BigQuery.TableDataListOption.pageSize(limit))
    }

© www.soinside.com 2019 - 2024. All rights reserved.