如何在GCS中的增量表之上创建BQ外部表并仅显示最新快照

问题描述 投票:0回答:4

我正在尝试在增量表之上创建一个外部 BQ 外部表,该表使用谷歌存储作为存储层。在增量表上,我们执行 DML,其中包括删除。

我可以在所有增量文件都存在的 gs 存储桶顶部创建一个 BQ 外部表。然而,它甚至会拉取删除记录,因为 BQ 外部表无法读取 delta 的事务日志,其中它说明了要考虑哪些 parquet 文件以及要删除哪个文件。

除了以编程方式将数据从 delta 复制到 BQ 之外,还有什么方法可以将 BQ 中 delta 表(gs 位置)的最新快照公开为外部表?

pyspark google-bigquery google-cloud-storage delta-lake
4个回答
4
投票

所以这个问题是一年多前提出的,但我对奥利弗的答案进行了棘手但强大的补充,消除了数据重复和额外的加载逻辑。

第 1 步 正如 Oliver 建议生成 symlink_format_manifest 文件;您可以在每次更新时触发它,也可以将 tblproperty 添加到您的文件中,如here所述 更新增量表时自动创建这些文件;

ALTER TABLE delta.`<path-to-delta-table>` SET TBLPROPERTIES(delta.compatibility.symlinkFormatManifest.enabled=true)

第 2 步 创建一个指向增量表位置的外部表

> bq mkdef --source_format=PARQUET "gs://test-delta/*.parquet" > bq_external_delta_logs
> bq mk --external_table_definition=bq_external_delta_logs test.external_delta_logs

步骤 3 创建另一个指向 symlink_format_manifest/manifest 文件的外部表

> bq mkdef --autodetect --source_format=CSV gs://test-delta/_symlink_format_manifest/manifest > bq_external_delta_manifest
> bq mk --table --external_table_definition=bq_external_delta_manifest test.external_delta_manifest

第 4 步 使用以下查询创建视图

> bq mk \
--use_legacy_sql=false \
--view \
'SELECT
  *
FROM
  `project_id.test.external_delta_logs` 
WHERE
  _FILE_NAME in (select * from `project_id.test.external_delta_logs`)' \
test.external_delta_snapshot

现在,只要从 test.external_delta_snapshot 视图刷新增量表,您就可以获得最新的快照,而无需任何额外的加载或数据重复。 此解决方案的缺点是,如果架构发生更改,您必须手动或使用 BQ 客户端等从 Spark 管道向表定义添加新字段。对于那些对此解决方案如何工作感到好奇的人,请继续阅读.


这是如何运作的;

符号链接清单文件包含新行分隔格式的镶木地板文件列表,指向当前增量版本分区;

gs://delta-test/......-part1.parquet
gs://delta-test/......-part2.parquet
....

除了我们的增量位置之外,我们还通过将此清单文件视为 CSV 文件(实际上是单列 CSV 文件)来定义另一个外部表。我们定义的视图利用了here提到的_FILE_NAME伪列,它指向表中每一行的镶木地板文件位置。如文档中所述,为每个指向存储在Cloud StorageGoogle Drive中的数据的外部表定义了 _FILE_NAME 伪列。

此时,我们已经有了加载最新快照所需的 parquet 文件列表,并且能够使用 _FILE_NAME 列过滤我们想要读取的文件。我们定义的视图只是定义了获取最新快照的过程。每当我们的增量表更新时,清单和增量日志表都会查找最新的数据,因此我们将始终获得最新的快照,而无需任何额外的加载或数据重复。

最后一句话,众所周知,外部表上的执行比 BQ 管理表更昂贵(执行成本),因此最好按照 Oliver 建议的那样尝试双重写入,并按照您的要求尝试外部表解决方案。存储比执行便宜,因此在某些情况下,将数据保存在 GCS 和 BQ 中的成本可能比保存这样的外部表要低。


1
投票

我也在开发这种管道,我们将 Delta Lake 文件转储到 GCS 中并将其呈现在 Bigquery 上。从 GCS 增量文件生成清单文件将根据增量文件当前设置的版本为您提供最新快照。然后,您需要创建一个自定义脚本来解析该清单文件以获取文件列表,然后运行提及这些文件的 bq 加载。

val deltaTable = DeltaTable.forPath(<path-to-delta-table>)
deltaTable.generate("symlink_format_manifest")

0
投票

以下解决方法可能适用于小型数据集。

有一个单独的 BQ 表。 将 delta Lake 文件读入 DataFrame,然后 df.overwrite 读入 BigQuery 表。


© www.soinside.com 2019 - 2024. All rights reserved.