从谷歌云存储桶创建谷歌bigquery表; BQ表名包含GCS子目录对应的日期

问题描述 投票:0回答:1

我想从谷歌云存储桶创建谷歌bigquery表。我想用Python来完成这些。 我用 python 访问 BQ 和 GCS 没有问题。效果很好。但我很难为我想做的事情创建正确的 python 代码。

在云存储中,我有一个存储桶“bucket/data/”,其中包含按日期组织的子目录,格式为日期=年-月-日。这些子目录包含 csv.gz 文件。例如,2023 年 4 月 21 日的全名是“bucket/data/date=2023-04-21”,2023 年 4 月 22 日的全名是“bucket/data/date=2023-04-22”。 ..

我希望bigquery表名包含子目录日期对应的日期。 例如:GCS:bucket/data/date=2023-04-21 - BQ:sessions_202304021

我还需要自动创建bigquery表模式。

在我尝试过但没有成功的Python代码下面:

from google.cloud import bigquery
from google.cloud import storage
import os
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = "*********.json"
storage_client = storage.Client()
bq_client = bigquery.Client()
bucket_name = "bucket"
directory_path = "bucket/data/"
blobs = storage_client.list_blobs(bucket_name, prefix=directory_path)
for blob in blobs:
    if blob.name.endswith("/"):
        date = blob.name.split("=")[-1]
        table_name = f"sessions_{date}"
        table_ref = bq_client.dataset("xxxxxxxxxxx").table(sessions_)        
        csv_file_path = os.path.join(directory_path, blob.name, "xxxxxxxxx.csv.gz")        
        job_config = bigquery.LoadJobConfig(
            autodetect=True,
            source_format=bigquery.SourceFormat.CSV,
            skip_leading_rows=1,
            compression=bigquery.Compression.GZIP,
        )
        job = bq_client.load_table_from_uri(
            csv_file_path, table_ref, job_config=job_config
        )
        job.result()
        print(f"Table BigQuery créée: {table.table_id}")
python google-bigquery google-cloud-storage
1个回答
0
投票

您需要创建一个 python 脚本来实现此目的,请按照本文档中列出的说明进行操作。

from google.cloud import bigquery

# Construct a BigQuery client object.
client = bigquery.Client()

# TODO(developer): Set table_id to the ID of the table to create.
table_id = "your-project.your_dataset.your_table_name"

# TODO(developer): Set the external source format of your table.
# Note that the set of allowed values for external data sources is
# different than the set used for loading data (see :class:`~google.cloud.bigquery.job.SourceFormat`).
external_source_format = "AVRO"

# TODO(developer): Set the source_uris to point to your data in Google Cloud
source_uris = [
    "gs://cloud-samples-data/bigquery/federated-formats-reference-file-schema/a-twitter.avro",
    "gs://cloud-samples-data/bigquery/federated-formats-reference-file-schema/b-twitter.avro",
    "gs://cloud-samples-data/bigquery/federated-formats-reference-file-schema/c-twitter.avro",
]

# Create ExternalConfig object with external source format
external_config = bigquery.ExternalConfig(external_source_format)
# Set source_uris that point to your data in Google Cloud
external_config.source_uris = source_uris

# TODO(developer) You have the option to set a reference_file_schema_uri, which points to
# a reference file for the table schema
reference_file_schema_uri = "gs://cloud-samples-data/bigquery/federated-formats-reference-file-schema/b-twitter.avro"

external_config.reference_file_schema_uri = reference_file_schema_uri

table = bigquery.Table(table_id)
# Set the external data configuration of the table
table.external_data_configuration = external_config
table = client.create_table(table)  # Make an API request.

print(
    f"Created table with external source format {table.external_data_configuration.source_format}"
)

早在 2018 年,就在 StackOverflow 上发布了类似的问题,也请查看它,因为它可以帮助您深入了解您的需求。

© www.soinside.com 2019 - 2024. All rights reserved.