将 BigQuery 表快照移动到 Google 云存储时出错

问题描述 投票:0回答:1

我正在尝试为 BigQuery 表生成快照并将其传输到 Google Cloud Storage。虽然我可以成功创建快照,但在尝试将它们移动到 GCS 时遇到错误。

google.api_core.exceptions.BadRequest:400 melodic-map-408207:kebron_dataset.GCS_SNAPSHOT_customer_2024-02-28 不允许执行此操作,因为它当前是 SNAPSHOT。;原因:无效,消息:melodic-map-408207:kebron_dataset.GCS_SNAPSHOT_customer_2024-02-28 不允许执行此操作,因为它当前是 SNAPSHOT

下面是我的代码

import os
import base64
import datetime
import json
from dotenv import load_dotenv
from google.cloud import bigquery
from google.cloud import storage
from google.oauth2 import service_account
from google.auth.transport.requests import AuthorizedSession

load_dotenv()

# Access environment variables
SERVICE_ACCOUNT_FILE = 'credentials.json'
PROJECT_ID = os.getenv("PROJECT_ID")
DATASET_ID = os.getenv("DATASET_ID")
FILE_EXTENSION = os.getenv("FILE_EXTENSION")

# Construct the URL for the API endpoint
BASE_URL = f"https://bigquery.googleapis.com"

credentials = service_account.Credentials.from_service_account_file(
        SERVICE_ACCOUNT_FILE,
        scopes=["https://www.googleapis.com/auth/cloud-platform"]
)

def move_snapshots_to_gcs(env_vars):

    # acess environment variables from scheduler
    BUCKET_NAME = env_vars.get("BUCKET_NAME")
    LOCATION = env_vars.get("LOCATION")

    authed_session = AuthorizedSession(credentials)

   # Generate a timestamp string
    timestamp = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")

    # Generate the folder name with the current date
    folder_name = f'SNAPSHOT_{timestamp.split("_")[0]}'

    # Initialize a GCS client
    storage_client = storage.Client(credentials=credentials)

    # Get the bucket
    bucket = storage_client.bucket(BUCKET_NAME)

    # Check if the folder exists in the bucket
    folder_blob = bucket.blob(folder_name + "/")

    if not folder_blob.exists():
        # Create the folder
        folder_blob.upload_from_string("")
        print(f"Folder {folder_name} created.")

    # Check if the flag file exists in the folder of that bucket
    flag_blob = bucket.blob(f"{folder_name}/backup_flag")

    if flag_blob.exists():
        print("Backup has already been performed. Exiting...")
        return

    # Construct a BigQuery client object with the service account credentials
    client = bigquery.Client(credentials=credentials)

    # List tables in the specified dataset
    tables = client.list_tables(DATASET_ID)
 
    print("Tables contained in '{}':".format(DATASET_ID))
    for table in tables:
        
        TABLE_ID=table.table_id
        SNAPSHOT_TABLE_ID=f"GCS_SNAPSHOT_{TABLE_ID}_{timestamp.split('_')[0]}"

        # Make the GET request with the access token
        response_status_code = create_table_snapshot()
        print(response_status_code)
        if response_status_code == 200:
            print(response_status_code)


            # Define the filename with the timestamp and filetype as parquet
            filename = f"{table.table_id.split('.')[-1]}_{timestamp}.{FILE_EXTENSION}"

            # Format the destination URI
            destination_uri = f"gs://{BUCKET_NAME}/{folder_name}/{filename}"
            dataset_ref = bigquery.DatasetReference(PROJECT_ID, DATASET_ID)
            table_ref = dataset_ref.table(SNAPSHOT_TABLE_ID)

            extract_job = client.extract_table(
                table_ref,
                destination_uri,
                # Location must match that of the source table.
                location=LOCATION,
            )  # API request
            extract_job.result()  # Waits for job to complete.

            print(
                "Exported {}:{}.{} to {}".format(table.project, table.dataset_id, table.table_id, destination_uri)
            )


def build_json_body(TABLE_ID, SNAPSHOT_TABLE_ID):
    json_body = f'''
            {{
                "configuration": {{
                    "copy": {{
                    "sourceTables": [
                        {{
                            "projectId": "{PROJECT_ID}",
                            "datasetId": "{DATASET_ID}",
                            "tableId": "{TABLE_ID}"
                        }}
                    ],
                    "destinationTable": 
                    {{
                        "projectId": "{PROJECT_ID}",
                        "datasetId": "{DATASET_ID}",
                        "tableId": "{SNAPSHOT_TABLE_ID}"
                    }},
                    "operationType": "SNAPSHOT",
                    "writeDisposition": "WRITE_EMPTY"
                    }}
                }}
            }}
    '''
    return json.loads(json_body)

def create_table_snapshot():
   # Generate a timestamp string
    timestamp = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")

    authed_session = AuthorizedSession(credentials)

    # Construct a BigQuery client object with the service account credentials
    client = bigquery.Client(credentials=credentials)

    # List tables in the specified dataset
    tables = client.list_tables(DATASET_ID)
    
    last_response_status = None  # Variable to store the last response status code

    for table in tables:

        TABLE_ID=table.table_id
        SNAPSHOT_TABLE_ID=f"GCS_SNAPSHOT_{TABLE_ID}_{timestamp.split('_')[0]}"

        json = build_json_body(TABLE_ID, SNAPSHOT_TABLE_ID)

        # Make the POST request to create snapshot
        response = authed_session.post(
            url=f'{BASE_URL}/bigquery/v2/projects/{PROJECT_ID}/jobs',
            json=json,
            headers={'Content-Type': 'application/json'}
        )

        print(response.json())

        last_response_status = response.status_code  # Update the status code

    return last_response_status

if __name__ == "__main__":
    #print(build_json_body('melodic-map-408207', 'kebron_dataset', 'student'))
    move_snapshots_to_gcs({'BUCKET_NAME': 'kebron_daily_backup_bucket', 'LOCATION': 'US'})

以下是我的错误

我尝试手动执行此操作,但遇到了类似的错误。

这不应该正常工作吗?为什么它没有按预期运行?

python google-bigquery google-cloud-storage google-oauth google-bigquery-storage-api
1个回答
0
投票

根据您的用例,您希望“将表数据导出到云存储”。该文档可以通过编码或通过控制台 UI指导您完成整个过程。

确保查看@Kolban提到的限制所需权限位置注意事项

示例代码:

import com.google.cloud.RetryOption;
import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.BigQueryException;
import com.google.cloud.bigquery.BigQueryOptions;
import com.google.cloud.bigquery.Job;
import com.google.cloud.bigquery.Table;
import com.google.cloud.bigquery.TableId;
import org.threeten.bp.Duration;

public class ExtractTableToCsv {

  public static void runExtractTableToCsv() {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "bigquery-public-data";
    String datasetName = "samples";
    String tableName = "shakespeare";
    String bucketName = "my-bucket";
    String destinationUri = "gs://" + bucketName + "/path/to/file";
    // For more information on export formats available see:
    // https://cloud.google.com/bigquery/docs/exporting-data#export_formats_and_compression_types
    // For more information on Job see:
    // https://googleapis.dev/java/google-cloud-clients/latest/index.html?com/google/cloud/bigquery/package-summary.html

    String dataFormat = "CSV";
    extractTableToCsv(projectId, datasetName, tableName, destinationUri, dataFormat);
  }

  // Exports datasetName:tableName to destinationUri as raw CSV
  public static void extractTableToCsv(
      String projectId,
      String datasetName,
      String tableName,
      String destinationUri,
      String dataFormat) {
    try {
      // Initialize client that will be used to send requests. This client only needs to be created
      // once, and can be reused for multiple requests.
      BigQuery bigquery = BigQueryOptions.getDefaultInstance().getService();

      TableId tableId = TableId.of(projectId, datasetName, tableName);
      Table table = bigquery.getTable(tableId);

      Job job = table.extract(dataFormat, destinationUri);

      // Blocks until this job completes its execution, either failing or succeeding.
      Job completedJob =
          job.waitFor(
              RetryOption.initialRetryDelay(Duration.ofSeconds(1)),
              RetryOption.totalTimeout(Duration.ofMinutes(3)));
      if (completedJob == null) {
        System.out.println("Job not executed since it no longer exists.");
        return;
      } else if (completedJob.getStatus().getError() != null) {
        System.out.println(
            "BigQuery was unable to extract due to an error: \n" + job.getStatus().getError());
        return;
      }
      System.out.println(
          "Table export successful. Check in GCS bucket for the " + dataFormat + " file.");
    } catch (BigQueryException | InterruptedException e) {
      System.out.println("Table extraction job was interrupted. \n" + e.toString());
    }
  }
}
© www.soinside.com 2019 - 2024. All rights reserved.