将 csv 上传到 bigquery 后避免重复

问题描述 投票:0回答:1

我使用谷歌文档示例代码将 CSV 文件(在云存储中)上传到 bigquery。 我最终有很多重复行,因为 CSV 文件中有很多重复项。我如何检查并避免上传重复项?

from google.cloud import bigquery

# Construct a BigQuery client object.
client = bigquery.Client()

# TODO(developer): Set table_id to the ID of the table to create.
# table_id = "your-project.your_dataset.your_table_name"

job_config = bigquery.LoadJobConfig(
    schema=[
        bigquery.SchemaField("name", "STRING"),
        bigquery.SchemaField("post_abbr", "STRING"),
    ],
    skip_leading_rows=1,
    # The source format defaults to CSV, so the line below is optional.
    source_format=bigquery.SourceFormat.CSV,
)
uri = "gs://cloud-samples-data/bigquery/us-states/us-states.csv"

load_job = client.load_table_from_uri(
    uri, table_id, job_config=job_config
)  # Make an API request.

load_job.result()  # Waits for the job to complete.

destination_table = client.get_table(table_id)  # Make an API request.
print("Loaded {} rows.".format(destination_table.num_rows))

我心里有两个选择:

  1. 在上面的代码中添加一行可以巧妙地避免上传bigquery中已经存在的行。有这种东西吗?
  2. 使用云函数、云调度程序和发布/订阅每 24 小时检查一次 bigquery 表,如果有任何重复项,将其删除(简单的 SQL 查询删除重复项)此解决方案需要一些额外的工作来设置云函数,调度程序和发布/订阅。数据量为数百GB,因此成本高且效率不高。
google-bigquery
1个回答
0
投票

为避免和删除重复

BigQuery
,您可以使用合并查询

合并查询允许:

  • 如果存在则更新元素
  • 如果不存在则插入元素

例子:

MERGE dataset.Inventory T
USING dataset.NewArrivals S
ON T.product = S.product
WHEN MATCHED THEN
  UPDATE SET quantity = T.quantity + S.quantity
WHEN NOT MATCHED THEN
  INSERT (product, quantity) VALUES(product, quantity)

为了能够开发这个用例,您可以使用以下步骤:

  • 使用与最终表结构相同的临时表(无需在临时表中保留每次执行的数据)
  • 首先是你的 Python 脚本
    truncate
    临时表
  • 加载文件到暂存表
  • 执行合并查询以插入或更新元素(如果存在)
  • 合并查询应用于临时表和最终表之间

您的脚本示例:

from google.cloud import bigquery

# Construct a BigQuery client object.
client = bigquery.Client()

# TODO(developer): Set table_id to the ID of the table to create.
staging_table_id = "your-project.your_dataset.your_table_name"

# Truncate staging table
truncate_query = client.query(
    f"TRUNCATE TABLE `$staging_table_id`"
)

truncate_results = truncate_query.result()

job_config = bigquery.LoadJobConfig(
    schema=[
        bigquery.SchemaField("name", "STRING"),
        bigquery.SchemaField("post_abbr", "STRING"),
    ],
    skip_leading_rows=1,
    # The source format defaults to CSV, so the line below is optional.
    source_format=bigquery.SourceFormat.CSV,
)
uri = "gs://cloud-samples-data/bigquery/us-states/us-states.csv"

# Load CSV file to staging file
load_job = client.load_table_from_uri(
    uri, staging_table_id, job_config=job_config
)  # Make an API request.

load_job.result()  # Waits for the job to complete.

destination_table = client.get_table(table_id)  # Make an API request.
print("Loaded {} rows.".format(destination_table.num_rows))

# Execute merge query between staging and final table
merge_query = ....

merge_results = merge_query.result()

您还可以查看我写的这篇文章remove duplicate bq batch pipeline,它显示并解释了一个重复数据删除用例

merge query

© www.soinside.com 2019 - 2024. All rights reserved.