我使用谷歌文档示例代码将 CSV 文件(在云存储中)上传到 bigquery。 我最终有很多重复行,因为 CSV 文件中有很多重复项。我如何检查并避免上传重复项?
from google.cloud import bigquery
# Construct a BigQuery client object.
client = bigquery.Client()
# TODO(developer): Set table_id to the ID of the table to create.
# table_id = "your-project.your_dataset.your_table_name"
job_config = bigquery.LoadJobConfig(
schema=[
bigquery.SchemaField("name", "STRING"),
bigquery.SchemaField("post_abbr", "STRING"),
],
skip_leading_rows=1,
# The source format defaults to CSV, so the line below is optional.
source_format=bigquery.SourceFormat.CSV,
)
uri = "gs://cloud-samples-data/bigquery/us-states/us-states.csv"
load_job = client.load_table_from_uri(
uri, table_id, job_config=job_config
) # Make an API request.
load_job.result() # Waits for the job to complete.
destination_table = client.get_table(table_id) # Make an API request.
print("Loaded {} rows.".format(destination_table.num_rows))
我心里有两个选择:
为避免和删除重复
BigQuery
,您可以使用合并查询
合并查询允许:
例子:
MERGE dataset.Inventory T
USING dataset.NewArrivals S
ON T.product = S.product
WHEN MATCHED THEN
UPDATE SET quantity = T.quantity + S.quantity
WHEN NOT MATCHED THEN
INSERT (product, quantity) VALUES(product, quantity)
为了能够开发这个用例,您可以使用以下步骤:
truncate
临时表您的脚本示例:
from google.cloud import bigquery
# Construct a BigQuery client object.
client = bigquery.Client()
# TODO(developer): Set table_id to the ID of the table to create.
staging_table_id = "your-project.your_dataset.your_table_name"
# Truncate staging table
truncate_query = client.query(
f"TRUNCATE TABLE `$staging_table_id`"
)
truncate_results = truncate_query.result()
job_config = bigquery.LoadJobConfig(
schema=[
bigquery.SchemaField("name", "STRING"),
bigquery.SchemaField("post_abbr", "STRING"),
],
skip_leading_rows=1,
# The source format defaults to CSV, so the line below is optional.
source_format=bigquery.SourceFormat.CSV,
)
uri = "gs://cloud-samples-data/bigquery/us-states/us-states.csv"
# Load CSV file to staging file
load_job = client.load_table_from_uri(
uri, staging_table_id, job_config=job_config
) # Make an API request.
load_job.result() # Waits for the job to complete.
destination_table = client.get_table(table_id) # Make an API request.
print("Loaded {} rows.".format(destination_table.num_rows))
# Execute merge query between staging and final table
merge_query = ....
merge_results = merge_query.result()
您还可以查看我写的这篇文章remove duplicate bq batch pipeline,它显示并解释了一个重复数据删除用例
merge query
。