我想运行包含数千行总结果的 BigQuery 查询,但我只想一次检索包含 100 个结果的页面(使用
maxResults
和 pageToken
参数)。
BigQuery API 支持在
pageToken
方法上使用 collection.list
参数。但是,我正在运行异步查询并使用 getQueryResult
方法检索结果,并且它似乎不支持 pageToken
参数。是否可以将 pageToken
与 getQueryResults
一起使用?
更新:有关于如何在此处翻阅列表结果的新文档。
我自我回答这个问题,因为有开发者私下问过我这个问题,我想在 Stack Overflow 上分享答案。
从 Tabledata.list 方法请求分页结果时可以使用 pageToken 参数。例如,当结果数据超过 100k 行或 10 MB 结果时,结果集会自动分页。您还可以通过显式设置 maxResults 参数来请求结果分页。每页结果都会返回一个 pageToken 参数,然后可以使用该参数检索下一页结果。
每个查询都会生成一个新的 BigQuery 表。如果您没有显式命名该表,它只会持续 24 小时。然而,即使是未命名的“匿名”表也有一个标识符。无论哪种情况,插入查询作业后,检索新创建的表的名称。然后使用 tabledata.list 方法(以及 maxResults/pageToken 参数的组合)以分页形式请求结果。使用之前检索到的 pageToken 循环并继续调用 tabledata.list,直到不再返回 pageToken(意味着您已到达最后一页。
使用适用于 Java 的 Google API 客户端库,插入查询作业、轮询查询完成情况,然后检索一页又一页的查询结果的代码可能如下所示:
// Create a new BigQuery client authorized via OAuth 2.0 protocol
// See: https://developers.google.com/bigquery/docs/authorization#installed-applications
Bigquery bigquery = createAuthorizedClient();
// Start a Query Job
String querySql = "SELECT TOP(word, 500), COUNT(*) FROM publicdata:samples.shakespeare";
JobReference jobId = startQuery(bigquery, PROJECT_ID, querySql);
// Poll for Query Results, return result output
TableReference completedJob = checkQueryResults(bigquery, PROJECT_ID, jobId);
// Return and display the results of the Query Job
displayQueryResults(bigquery, completedJob);
/**
* Inserts a Query Job for a particular query
*/
public static JobReference startQuery(Bigquery bigquery, String projectId,
String querySql) throws IOException {
System.out.format("\nInserting Query Job: %s\n", querySql);
Job job = new Job();
JobConfiguration config = new JobConfiguration();
JobConfigurationQuery queryConfig = new JobConfigurationQuery();
config.setQuery(queryConfig);
job.setConfiguration(config);
queryConfig.setQuery(querySql);
Insert insert = bigquery.jobs().insert(projectId, job);
insert.setProjectId(projectId);
JobReference jobId = insert.execute().getJobReference();
System.out.format("\nJob ID of Query Job is: %s\n", jobId.getJobId());
return jobId;
}
/**
* Polls the status of a BigQuery job, returns TableReference to results if "DONE"
*/
private static TableReference checkQueryResults(Bigquery bigquery, String projectId, JobReference jobId)
throws IOException, InterruptedException {
// Variables to keep track of total query time
long startTime = System.currentTimeMillis();
long elapsedTime;
while (true) {
Job pollJob = bigquery.jobs().get(projectId, jobId.getJobId()).execute();
elapsedTime = System.currentTimeMillis() - startTime;
System.out.format("Job status (%dms) %s: %s\n", elapsedTime,
jobId.getJobId(), pollJob.getStatus().getState());
if (pollJob.getStatus().getState().equals("DONE")) {
return pollJob.getConfiguration().getQuery().getDestinationTable();
}
// Pause execution for one second before polling job status again, to
// reduce unnecessary calls to the BigQUery API and lower overall
// application bandwidth.
Thread.sleep(1000);
}
}
/**
* Page through the result set
*/
private static void displayQueryResults(Bigquery bigquery,
TableReference completedJob) throws IOException {
long maxResults = 20;
String pageToken = null;
int page = 1;
// Default to not looping
boolean moreResults = false;
do {
TableDataList queryResult = bigquery.tabledata().list(
completedJob.getProjectId(),
completedJob.getDatasetId(),
completedJob.getTableId())
.setMaxResults(maxResults)
.setPageToken(pageToken)
.execute();
List<TableRow> rows = queryResult.getRows();
System.out.print("\nQuery Results, Page #" + page + ":\n------------\n");
for (TableRow row : rows) {
for (TableCell field : row.getF()) {
System.out.printf("%-50s", field.getV());
}
System.out.println();
}
if (queryResult.getPageToken() != null) {
pageToken = queryResult.getPageToken();
moreResults = true;
page++;
} else {
moreResults = false;
}
} while (moreResults);
}
对于在 2024 年偶然发现此问题并使用 Java 客户端的任何人来说,有一种更短的方法来管理将结果导出到匿名临时表(即
QueryJobConfiguration.setDestinationTable
中的目标表),但它也需要查询与作业一起使用,而不是仅使用其中一种 BigQuery.query
方法
这是常规代码 - 您需要在编译单元中的某个位置有一个名为
BigQuery
的 bigquery
服务对象...
/**
* Uses a Job to query so we can access the anonymous Destination Table that is where the results
* are cached. This enables walking through paginated results in the service layer
* @param query sql query
* @param limit max results per page
* @return the result of the query, using the anonymous Destination Table as the source
*/
protected TableResult _queryPaginate(String query, Integer limit){
// Use standard SQL syntax for queries.
// See: https://cloud.google.com/bigquery/sql-reference/
QueryJobConfiguration queryConfig =
QueryJobConfiguration.newBuilder(query)
.setUseLegacySql(false)
.build()
// Create a job ID so that we can safely retry.
JobId jobId = JobId.of(UUID.randomUUID().toString());
Job queryJob = bigQuery.create(JobInfo.newBuilder(queryConfig).setJobId(jobId).build())
// Wait for the query to complete.
queryJob = queryJob.waitFor()
log.info "queryJob complete:${queryJob.getJobId()}"
// Check for errors
if (queryJob == null) {
throw new RuntimeException("Job no longer exists")
} else if (queryJob.getStatus().getError() != null) {
// You can also look at queryJob.getStatus().getExecutionErrors() for all
// errors, not just the latest one.
throw new RuntimeException(queryJob.getStatus().getError().toString())
}
def queryJobConfig = queryJob.getConfiguration() as QueryJobConfiguration
def resultsTable = queryJobConfig.getDestinationTable()
log.info "list from resultsTable:$resultsTable"
//first get the results so we can get the schema
def results = queryJob.getQueryResults()
//then list the results from the anonymous table for pagination
bigQuery.listTableData(resultsTable,results.getSchema(), BigQuery.TableDataListOption.pageSize(limit))
}