使用 Autoloader 进行架构演变

问题描述 投票:0回答:1

我有一个模式演化案例。

详细说明:

我使用自动加载器和 foreachbatch 将源表从数据湖加载到青铜层 aas 行数据,并具有合并到 statemenet 的功能。完成这项工作没有问题。

从青铜移动到银层时,作为源表,我应用

select
语句在移动到银层时过滤掉额外的列。

我的问题是只有一张桌子。

青铜层中的

customeraddress
表具有
MSFT_DATASTATE
列,这与银层中的同一个表的情况不同。所以我想将此列自动添加到我的白银表中。

# Enable autoMerge for schema evolution
spark.conf.set("spark.databricks.delta.schema.autoMerge.enabled", "true")

p = re.compile('^BK_')
list_of_columns = dfUpdates.columns
list_of_BK_columns = [ s for s in dfUpdates.columns if p.match(s) ]
string = ''
for column in list_of_BK_columns:
    string += f'table.{column} = newData.{column} and '
dictionary = {}
for key in list_of_columns:
    dictionary[key] = f'newData.{key}'

# print("printing " + cdm + " columns")
print("We at this stage now -----------------------------------------------------")
# print(dfUpdates.columns)

deltaTable = DeltaTable.forPath(spark, f"abfss://silver@{storage_account}.dfs.core.windows.net/D365/{table.lower()}_ao")
deltaTable.alias('table') \
    .merge(dfUpdates.alias("newData"), string) \
    .whenMatchedUpdate(set=dictionary) \
    .whenNotMatchedInsert(values=dictionary) \
    .execute()

df.writeStream.foreachBatch(lambda df, epochId: update_changefeed(df, table, epochId)).option("checkpointLocation", checkpoint_directory).trigger(availableNow=True).start()

我收到的错误指出:

SET 列 ``` 未找到给定列: [

PK_D365_customeraddress
IsDelete
等]

这是正确的,

MSFT_DATASTATE
列不在我的银色增量表中。

pyspark merge schema azure-databricks autoload
1个回答
0
投票

参考:https://www.databricks.com/blog/2019/09/24/diving-into-delta-lake-schema-enforcement-evolution.html 尝试使用这个:

# Add the mergeSchema option
loans.write.format("delta") \
           .option("mergeSchema", "true") \
           .mode("append") \
           .save(DELTALAKE_SILVER_PATH)
© www.soinside.com 2019 - 2024. All rights reserved.