我正在使用bigquery客户端对象将一些CSV文件(位于云存储中)上传到bigquery表中。
我设法将数据上传到 bigquery 表中,但我想将目标表更改为分区表。分区将是文件名中的日期。
文件名是CSV文件中的一列,与CSV文件名相同。
这就是我从文件名中提取日期的方法(假设文本是文件名)date1 稍后将用作我们的分区:
text = 'sales_2022-09-09T21-27-05_018787'
match = re.search(r'\d{4}-\d{2}-\d{2}', text)
date1 = datetime.strptime(match.group(), '%Y-%m-%d').date()
这是将数据上传到 BQ 的方法:
client = bigquery.Client.from_service_account_json(CREDENTIALS_LOCATION)
def upload_from_gcs_to_bq(project_id, dataset_id, gsutil_uri, table_name,gcs_blob):
table_id = project_id +'.'+ dataset_id +'.'+ table_name
uri = gsutil_uri + '/' + gcs_blob +'.csv'
job_config = bigquery.LoadJobConfig(
schema=[
bigquery.SchemaField("filename", "STRING"),
bigquery.SchemaField("sales_category", "STRING"),
...
],
skip_leading_rows=1,
# time_partitioning=bigquery.TimePartitioning(
# type_=bigquery.TimePartitioningType.DAY,
# field="date", # Name of the column to use for partitioning.
# expiration_ms=7776000000, # 90 days.
# ),
)
load_job = client.load_table_from_uri(
uri, table_id, job_config=job_config
)
load_job.result() # Wait for the job to complete.
table = client.get_table(table_id)
def main():
upload_from_gcs_to_bq(project_id, dataset_id, gsutil_uri, table_name,gcs_blob)
if __name__ == '__main__':
main()
我认为最好利用外部表,因为您的数据已经存储在云存储中。
您可以通过直接读取 CSV 文件来创建永久或临时外部表。
https://cloud.google.com/bigquery/docs/external-data-cloud-storage
然后将信息加载到按您目标字段分区的表中。
如果您有分区文件,还有一个不错的选择将它们加载为外部表,但您需要遵循云存储中的特定格式
https://cloud.google.com/bigquery/docs/hive-partitioned-queries-gcs