Databricks Delta Live 表 - 应用增量表中的更改

问题描述 投票:0回答:2

我正在使用 Databricks Delta Live Tables,但在向上游更新插入某些表时遇到一些问题。我知道下面的文字很长,但我试图尽可能清楚地描述我的问题。如果有些部分不清楚,请告诉我。

我有以下表格和流程:

Landing_zone -> 这是一个添加 JSON 文件的文件夹,其中包含插入或更新的记录的数据。 Raw_table -> 这是 JSON 文件中的数据,但采用表格格式。该表采用增量格式。除了将 JSON 结构转换为表格结构(我进行了爆炸,然后从 JSON 键创建列)之外,未进行任何转换。 Intermediate_table -> 这是原始表,但有一些额外的列(取决于其他列值)。

要从我的着陆区转到原始表,我有以下 Pyspark 代码:

cloudfile = {"cloudFiles.format":"JSON", 
                       "cloudFiles.schemaLocation": sourceschemalocation, 
                       "cloudFiles.inferColumnTypes": True}

@dlt.view('landing_view')
def inc_view():
    df = (spark
             .readStream
             .format('cloudFiles')
             .options(**cloudFilesOptions)
             .load(filpath_to_landing)
     <Some transformations to go from JSON to tabular (explode, ...)>
     return df

dlt.create_target_table('raw_table', 
                        table_properties = {'delta.enableChangeDataFeed': 'true'})
  
dlt.apply_changes(target='raw_table',
                  source='landing_view',
                  keys=['id'],
                  sequence_by='updated_at')

此代码按预期工作。我运行它,向着陆区添加一个changes.JSON 文件,重新运行管道,并且更新插入正确地应用于“raw_table”

(但是,每次在 delta 文件夹中创建包含所有数据的新 parquet 文件时,我希望只添加包含插入和更新行的 parquet 文件?并且有关当前版本的一些信息保留在增量日志?不确定这是否与我的问题相关。我已经将“raw_table”的 table_properties 更改为 enableChangeDataFeed = true。“intermediate_table”的 readStream 具有选项(readChangeFeed, 'true'))。

然后我有以下代码从“raw_table”转到“intermediate_table”:

@dlt.table(name='V_raw_table', table_properties={delta.enableChangeDataFeed': 'True'})
def raw_table():
     df = (spark.readStream
                .format('delta')
                .option('readChangeFeed', 'true')
                .table('LIVE.raw_table'))
     df = df.withColumn('ExtraCol', <Transformation>)
     return df
 ezeg
dlt.create_target_table('intermediate_table')
dlt.apply_changes(target='intermediate_table',
                  source='V_raw_table',
                  keys=['id'],
                  sequence_by='updated_at')

不幸的是,当我运行这个时,我收到错误: '在版本 2 的源表中检测到数据更新(例如,part-00000-7127bd29-6820-406c-a5a1-e76fc7126150-c000.snappy.parquet)。目前不支持此操作。如果您想忽略更新,请将选项“ignoreChanges”设置为“true”。如果您希望反映数据更新,请使用新的检查点目录重新启动此查询。'

我检查了“ignoreChanges”,但认为这不是我想要的。我希望自动加载器能够检测增量表中的更改并将它们传递到流程中。

我知道 readStream 仅适用于追加,但这就是为什么我希望在更新“raw_table”后,一个新的 parquet 文件将被添加到仅包含插入和更新的 delta 文件夹中。然后,自动加载器会检测到此添加的镶木地板文件,并可用于将更改应用到“intermediate_table”。

我这样做的方式错了吗?或者我忽略了什么?预先感谢!

databricks delta-lake delta-live-tables databricks-autoloader
2个回答
0
投票

由于 readStream 仅适用于追加,因此源文件中的任何更改都会在下游产生问题。 “raw_table”的更新只会插入新的镶木地板文件的假设是不正确的。基于“优化写入”等设置,甚至没有它,apply_changes 可以添加或删除文件。您可以在“raw_table/_delta_log/xxx.json”中的“numTargetFilesAdded”和“numTargetFilesRemoved”下找到此信息。

基本上,“Databricks 建议您使用 Auto Loader 仅提取不可变文件”。


0
投票

当您更改设置以包含选项“.option('readChangeFeed', 'true')”时,您应该从完全刷新开始(开始附近有下拉菜单)。这样做将解决错误“检测到的数据更新 xxx”,并且您的代码应该适用于增量更新。

© www.soinside.com 2019 - 2024. All rights reserved.