我正在使用云函数将数据从数据存储读取到bigquery,该函数每1小时运行一次,但我需要在重写之前删除所有行,因为缓冲,这是不允许的:
BigQuery bigquery = BigQueryOptions.getDefaultInstance().getService();
//Table table = bigquery.getTable(TableId.of(DATASET, TABLE_NAME));
if (bigquery.getTable(TableId.of(DATASET, TABLE_NAME)) != null) {
QueryJobConfiguration queryConfig =
QueryJobConfiguration.newBuilder(
"DELETE FROM `" + DATASET + "." + TABLE_NAME + "` WHERE true;")
.build();
JobId jobId = JobId.of(UUID.randomUUID().toString());
Job queryJob = bigquery.create(JobInfo.newBuilder(queryConfig).setJobId(jobId).build());
queryJob = queryJob.waitFor();
// Check for errors
if (queryJob == null) {
throw new RuntimeException("Job no longer exists");
} else if (queryJob.getStatus().getError() != null) {
// You can also look at queryJob.getStatus().getExecutionErrors() for all
// errors, not just the latest one.
throw new RuntimeException(queryJob.getStatus().getError().toString());
}
} else {
Field id = Field.of("id", StandardSQLTypeName.INT64);
Field name = Field.of("name", StandardSQLTypeName.STRING);
Schema schema = Schema.of(id, name);
TableId tableId = TableId.of(DATASET, TABLE_NAME);
TableDefinition tableDefinition = StandardTableDefinition.of(schema);
TableInfo tableInfo = TableInfo.newBuilder(tableId, tableDefinition).build();
bigquery.create(tableInfo);
}
TableRow row = new TableRow();
for (Map.Entry<String, Object> entry : campaign.entrySet()) {
row.set("id", entry.getKey()).set("name", entry.getValue());
bigquery.insertAll(InsertAllRequest.newBuilder(TableId.of(DATASET, TABLE_NAME)).addRow(row).build());
}
这会执行运行时错误:
“对表进行 UPDATE 或 DELETE 语句会影响表中的行 流缓冲区,不支持”
我很高兴地宣布,公共预览版现在支持通过 BigQuery Storage Write API* 对最近的流数据进行变异 DML 语句(UPDATE、DELETE、MERGE)!在此处查看该功能以及如何将您的项目列入白名单:https://cloud.google.com/bigquery/docs/write-api#use_data_manipulation_language_dml_with_recently_streamed_data。
*此功能仅支持最近通过 BigQuery Storage Write API 传输的数据,而不支持旧版 insertAll 流式传输 API。