底层数据更改时的 Databricks DLT 和 CDC

问题描述 投票:0回答:1

我有一个 DLT,其中底层数据以 parquet 格式存储在 S3 中。该数据可能会被更新和附加。抛开 SCD,仅关注 CDC,我正在尝试找到 SQL 语法,该语法将允许青铜级和后续银级子表从源中引入附加数据,并在发现更改时更新现有行。

Databricks 自己关于这种情况的文档https://docs.databricks.com/en/delta-live-tables/cdc.html#language-sql 表示定义为源,然后使用以下语法应用更改:

CREATE OR REFRESH STREAMING TABLE table_name;

APPLY CHANGES INTO LIVE.table_name
FROM sourceKEYS (keys)
[IGNORE NULL UPDATES]
[APPLY AS DELETE WHEN condition]
[APPLY AS TRUNCATE WHEN condition]
SEQUENCE BY orderByColumn
[COLUMNS {columnList | * EXCEPT (exceptColumnList)}]
[STORED AS {SCD TYPE 1 | SCD TYPE 2}][TRACK HISTORY ON {columnList | * EXCEPT (exceptColumnList)}]

但是,当应用此方法并手动将更改插入源数据时,会出现以下错误,该错误明显与 Databricks 文档相矛盾:

Flow 'silv_test' has FAILED fatally. An error occurred because we detected an update or delete to one or more rows in the source table. Streaming tables may only use append-only streaming sources. If you expect to delete or update rows to the source table in the future, please convert table silv_test to a live table instead of a streaming live table. To resolve this issue, perform a Full Refresh to table silv_test. A Full Refresh will attempt to clear all data from table silv_test and then load all data from the streaming source. The non-append change can be found at version 3

如果

APPLY CHANGES
不能解决这个问题,那么应该如何构建DLT来克服这个问题?

databricks cdc delta-live-tables
1个回答
0
投票

首先我们需要记住,DLT 假设数据输入表是流。流意味着表只能追加。即使 APPLY CHANGES 仅在输入表是流/仅附加表时才有效。即它期望将更改的记录附加到输入表中。然后,您可以使用 DLT 使用这些附加记录来更新下游表中的现有记录。

对源表的历史记录进行任何更新或删除都会导致您上面提供的错误。您可以在此处阅读更多相关信息。

对于您的场景,如果源无法附加更改的记录而不是更新它们,那么唯一的选择是将更改应用到每个层(铜牌、银牌、金牌)。您不需要在 DLT 中执行此操作,您可以拥有单独的 pyspark 笔记本。这也适用于 DELETES(示例在特定保留期后仅从青铜层删除数据)。如果您不将skipchangecommits 标志指定为True,您的管道仍然会失败。这告诉 DLT 在流表的情况下忽略对早期历史记录的任何更改。我找不到使用 Databricks SQL 执行此操作的方法,但您可以使用 python 语言参考

© www.soinside.com 2019 - 2024. All rights reserved.