我正在尝试为 BigQuery 表生成快照并将其传输到 Google Cloud Storage。虽然我可以成功创建快照,但在尝试将它们移动到 GCS 时遇到错误。
google.api_core.exceptions.BadRequest:400 melodic-map-408207:kebron_dataset.GCS_SNAPSHOT_customer_2024-02-28 不允许执行此操作,因为它当前是 SNAPSHOT。;原因:无效,消息:melodic-map-408207:kebron_dataset.GCS_SNAPSHOT_customer_2024-02-28 不允许执行此操作,因为它当前是 SNAPSHOT
下面是我的代码
import os
import base64
import datetime
import json
from dotenv import load_dotenv
from google.cloud import bigquery
from google.cloud import storage
from google.oauth2 import service_account
from google.auth.transport.requests import AuthorizedSession
load_dotenv()
# Access environment variables
SERVICE_ACCOUNT_FILE = 'credentials.json'
PROJECT_ID = os.getenv("PROJECT_ID")
DATASET_ID = os.getenv("DATASET_ID")
FILE_EXTENSION = os.getenv("FILE_EXTENSION")
# Construct the URL for the API endpoint
BASE_URL = f"https://bigquery.googleapis.com"
credentials = service_account.Credentials.from_service_account_file(
SERVICE_ACCOUNT_FILE,
scopes=["https://www.googleapis.com/auth/cloud-platform"]
)
def move_snapshots_to_gcs(env_vars):
# acess environment variables from scheduler
BUCKET_NAME = env_vars.get("BUCKET_NAME")
LOCATION = env_vars.get("LOCATION")
authed_session = AuthorizedSession(credentials)
# Generate a timestamp string
timestamp = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
# Generate the folder name with the current date
folder_name = f'SNAPSHOT_{timestamp.split("_")[0]}'
# Initialize a GCS client
storage_client = storage.Client(credentials=credentials)
# Get the bucket
bucket = storage_client.bucket(BUCKET_NAME)
# Check if the folder exists in the bucket
folder_blob = bucket.blob(folder_name + "/")
if not folder_blob.exists():
# Create the folder
folder_blob.upload_from_string("")
print(f"Folder {folder_name} created.")
# Check if the flag file exists in the folder of that bucket
flag_blob = bucket.blob(f"{folder_name}/backup_flag")
if flag_blob.exists():
print("Backup has already been performed. Exiting...")
return
# Construct a BigQuery client object with the service account credentials
client = bigquery.Client(credentials=credentials)
# List tables in the specified dataset
tables = client.list_tables(DATASET_ID)
print("Tables contained in '{}':".format(DATASET_ID))
for table in tables:
TABLE_ID=table.table_id
SNAPSHOT_TABLE_ID=f"GCS_SNAPSHOT_{TABLE_ID}_{timestamp.split('_')[0]}"
# Make the GET request with the access token
response_status_code = create_table_snapshot()
print(response_status_code)
if response_status_code == 200:
print(response_status_code)
# Define the filename with the timestamp and filetype as parquet
filename = f"{table.table_id.split('.')[-1]}_{timestamp}.{FILE_EXTENSION}"
# Format the destination URI
destination_uri = f"gs://{BUCKET_NAME}/{folder_name}/{filename}"
dataset_ref = bigquery.DatasetReference(PROJECT_ID, DATASET_ID)
table_ref = dataset_ref.table(SNAPSHOT_TABLE_ID)
extract_job = client.extract_table(
table_ref,
destination_uri,
# Location must match that of the source table.
location=LOCATION,
) # API request
extract_job.result() # Waits for job to complete.
print(
"Exported {}:{}.{} to {}".format(table.project, table.dataset_id, table.table_id, destination_uri)
)
def build_json_body(TABLE_ID, SNAPSHOT_TABLE_ID):
json_body = f'''
{{
"configuration": {{
"copy": {{
"sourceTables": [
{{
"projectId": "{PROJECT_ID}",
"datasetId": "{DATASET_ID}",
"tableId": "{TABLE_ID}"
}}
],
"destinationTable":
{{
"projectId": "{PROJECT_ID}",
"datasetId": "{DATASET_ID}",
"tableId": "{SNAPSHOT_TABLE_ID}"
}},
"operationType": "SNAPSHOT",
"writeDisposition": "WRITE_EMPTY"
}}
}}
}}
'''
return json.loads(json_body)
def create_table_snapshot():
# Generate a timestamp string
timestamp = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
authed_session = AuthorizedSession(credentials)
# Construct a BigQuery client object with the service account credentials
client = bigquery.Client(credentials=credentials)
# List tables in the specified dataset
tables = client.list_tables(DATASET_ID)
last_response_status = None # Variable to store the last response status code
for table in tables:
TABLE_ID=table.table_id
SNAPSHOT_TABLE_ID=f"GCS_SNAPSHOT_{TABLE_ID}_{timestamp.split('_')[0]}"
json = build_json_body(TABLE_ID, SNAPSHOT_TABLE_ID)
# Make the POST request to create snapshot
response = authed_session.post(
url=f'{BASE_URL}/bigquery/v2/projects/{PROJECT_ID}/jobs',
json=json,
headers={'Content-Type': 'application/json'}
)
print(response.json())
last_response_status = response.status_code # Update the status code
return last_response_status
if __name__ == "__main__":
#print(build_json_body('melodic-map-408207', 'kebron_dataset', 'student'))
move_snapshots_to_gcs({'BUCKET_NAME': 'kebron_daily_backup_bucket', 'LOCATION': 'US'})
这不应该正常工作吗?为什么它没有按预期运行?
根据您的用例,您希望“将表数据导出到云存储”。该文档可以通过编码或通过控制台 UI指导您完成整个过程。
示例代码:
import com.google.cloud.RetryOption;
import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.BigQueryException;
import com.google.cloud.bigquery.BigQueryOptions;
import com.google.cloud.bigquery.Job;
import com.google.cloud.bigquery.Table;
import com.google.cloud.bigquery.TableId;
import org.threeten.bp.Duration;
public class ExtractTableToCsv {
public static void runExtractTableToCsv() {
// TODO(developer): Replace these variables before running the sample.
String projectId = "bigquery-public-data";
String datasetName = "samples";
String tableName = "shakespeare";
String bucketName = "my-bucket";
String destinationUri = "gs://" + bucketName + "/path/to/file";
// For more information on export formats available see:
// https://cloud.google.com/bigquery/docs/exporting-data#export_formats_and_compression_types
// For more information on Job see:
// https://googleapis.dev/java/google-cloud-clients/latest/index.html?com/google/cloud/bigquery/package-summary.html
String dataFormat = "CSV";
extractTableToCsv(projectId, datasetName, tableName, destinationUri, dataFormat);
}
// Exports datasetName:tableName to destinationUri as raw CSV
public static void extractTableToCsv(
String projectId,
String datasetName,
String tableName,
String destinationUri,
String dataFormat) {
try {
// Initialize client that will be used to send requests. This client only needs to be created
// once, and can be reused for multiple requests.
BigQuery bigquery = BigQueryOptions.getDefaultInstance().getService();
TableId tableId = TableId.of(projectId, datasetName, tableName);
Table table = bigquery.getTable(tableId);
Job job = table.extract(dataFormat, destinationUri);
// Blocks until this job completes its execution, either failing or succeeding.
Job completedJob =
job.waitFor(
RetryOption.initialRetryDelay(Duration.ofSeconds(1)),
RetryOption.totalTimeout(Duration.ofMinutes(3)));
if (completedJob == null) {
System.out.println("Job not executed since it no longer exists.");
return;
} else if (completedJob.getStatus().getError() != null) {
System.out.println(
"BigQuery was unable to extract due to an error: \n" + job.getStatus().getError());
return;
}
System.out.println(
"Table export successful. Check in GCS bucket for the " + dataFormat + " file.");
} catch (BigQueryException | InterruptedException e) {
System.out.println("Table extraction job was interrupted. \n" + e.toString());
}
}
}