将数据从Google Storage加载到BigQuery时如何执行UPSERT?

问题描述 投票:2回答:2

BigQuery支持以下策略:

WRITE_APPEND - 指定可以将行附加到现有表。

WRITE_EMPTY - 指定输出表必须为空。

WRITE_TRUNCATE - 指定write应替换表。

它们都不适合UPSERT操作的目的。

我正在将订单Json文件导入Google Storage,我想将其加载到BigQuery中。由于逻辑表明某些记录将是新的,而其他记录已经存在,并且需要更新(例如更新订单状态(新的/暂停/发送/退款等...)

我正在使用Airflow,但我的问题是通用的:

update_bigquery = GoogleCloudStorageToBigQueryOperator(
    dag=dag,
    task_id='load_orders_to_BigQuery',
    bucket=GCS_BUCKET_ID,
    destination_project_dataset_table=table_name_template,
    source_format='NEWLINE_DELIMITED_JSON',
    source_objects=[gcs_export_uri_template],
    schema_fields=dc(),
    create_disposition='CREATE_IF_NEEDED',
    write_disposition='WRITE_TRUNCATE',
    skip_leading_rows = 1,
    google_cloud_storage_conn_id=CONNECTION_ID,
    bigquery_conn_id=CONNECTION_ID
)

此代码使用WRITE_TRUNCATE,这意味着它删除整个表并加载所请求的文件。

如何修改它以支持UPSERT

我唯一的选择是查询表搜索json中出现的现有订单是否删除它们然后执行LOAD

google-bigquery
2个回答
3
投票

您可以运行一个查询,而不是运行GoogleCloudStorageToBigQueryOperator,它会提供与upsert相同的结果。

来自https://cloud.google.com/bigquery/docs/reference/standard-sql/dml-syntax#merge_statement的示例:

MERGE dataset.Inventory T
USING dataset.NewArrivals S
ON T.product = S.product
WHEN MATCHED THEN
  UPDATE SET quantity = T.quantity + S.quantity
WHEN NOT MATCHED THEN
  INSERT (product, quantity) VALUES(product, quantity)

此查询将:

  • 看一下表T(当前)和S(更新)。
  • 如果更新更改现有行,它将在该行上运行UPDATE
  • 如果更新有一个尚未存在的产品,它将INSERT新行。

现在,BigQuery如何知道你的桌子S?你可以:


0
投票

MERGE不支持DELETE+INSERT *'。 G'问题跟踪器中有一项功能请求,如果您想加注它。

我们也使用AF和加载订单;-)。因为我们想要保留历史更改,所以我们加载到一个表中,然后根据主键字段运行deDup查询。结果保存在单独的表中(截断)。该表具有我们订单行的最新版本/状态,然后我们将其用于后续查询。

查找SQL示例的重复数据删除和ROW_NUM()

请注意,根据卷的不同,您可能不需要实现,视图或子查询可能同样有效。

© www.soinside.com 2019 - 2024. All rights reserved.