将 CSV 文件上传到分区的 bigquery 表中(根据文件名生成分区)

问题描述 投票:0回答:2

我正在使用bigquery客户端对象将一些CSV文件(位于云存储中)上传到bigquery表中。

我设法将数据上传到 bigquery 表中,但我想将目标表更改为分区表。分区将是文件名中的日期。

文件名是CSV文件中的一列,与CSV文件名相同。

这就是我从文件名中提取日期的方法(假设文本是文件名)date1 稍后将用作我们的分区:

text = 'sales_2022-09-09T21-27-05_018787'
match = re.search(r'\d{4}-\d{2}-\d{2}', text)
date1 = datetime.strptime(match.group(), '%Y-%m-%d').date()

这是将数据上传到 BQ 的方法:

client = bigquery.Client.from_service_account_json(CREDENTIALS_LOCATION)
def upload_from_gcs_to_bq(project_id, dataset_id, gsutil_uri, table_name,gcs_blob):
    table_id = project_id +'.'+ dataset_id +'.'+ table_name

    uri = gsutil_uri + '/' + gcs_blob +'.csv'

    job_config = bigquery.LoadJobConfig(
        schema=[
            bigquery.SchemaField("filename", "STRING"),
            bigquery.SchemaField("sales_category", "STRING"),
            ...
        ],

        skip_leading_rows=1,
        # time_partitioning=bigquery.TimePartitioning(
        #     type_=bigquery.TimePartitioningType.DAY,
        #     field="date",  # Name of the column to use for partitioning.
        #     expiration_ms=7776000000,  # 90 days.
        # ),
    )    
    load_job = client.load_table_from_uri(
        uri, table_id, job_config=job_config
    ) 
    load_job.result()  # Wait for the job to complete.
    table = client.get_table(table_id)

def main():
    upload_from_gcs_to_bq(project_id, dataset_id, gsutil_uri, table_name,gcs_blob)

if __name__ == '__main__':
    main()
python google-cloud-platform google-bigquery
2个回答
1
投票

我认为最好利用外部表,因为您的数据已经存储在云存储中。

您可以通过直接读取 CSV 文件来创建永久或临时外部表。

https://cloud.google.com/bigquery/docs/external-data-cloud-storage

然后将信息加载到按您目标字段分区的表中。

如果您有分区文件,还有一个不错的选择将它们加载为外部表,但您需要遵循云存储中的特定格式

https://cloud.google.com/bigquery/docs/hive-partitioned-queries-gcs


© www.soinside.com 2019 - 2024. All rights reserved.