我有一个模式演化案例。
详细说明:
我使用自动加载器和 foreachbatch 将源表从数据湖加载到青铜层 aas 行数据,并具有合并到 statemenet 的功能。完成这项工作没有问题。
从青铜移动到银层时,作为源表,我应用
select
语句在移动到银层时过滤掉额外的列。
我的问题是只有一张桌子。
青铜层中的customeraddress
表具有MSFT_DATASTATE
列,这与银层中的同一个表的情况不同。所以我想将此列自动添加到我的白银表中。
# Enable autoMerge for schema evolution
spark.conf.set("spark.databricks.delta.schema.autoMerge.enabled", "true")
p = re.compile('^BK_')
list_of_columns = dfUpdates.columns
list_of_BK_columns = [ s for s in dfUpdates.columns if p.match(s) ]
string = ''
for column in list_of_BK_columns:
string += f'table.{column} = newData.{column} and '
dictionary = {}
for key in list_of_columns:
dictionary[key] = f'newData.{key}'
# print("printing " + cdm + " columns")
print("We at this stage now -----------------------------------------------------")
# print(dfUpdates.columns)
deltaTable = DeltaTable.forPath(spark, f"abfss://silver@{storage_account}.dfs.core.windows.net/D365/{table.lower()}_ao")
deltaTable.alias('table') \
.merge(dfUpdates.alias("newData"), string) \
.whenMatchedUpdate(set=dictionary) \
.whenNotMatchedInsert(values=dictionary) \
.execute()
df.writeStream.foreachBatch(lambda df, epochId: update_changefeed(df, table, epochId)).option("checkpointLocation", checkpoint_directory).trigger(availableNow=True).start()
我收到的错误指出:
SET 列 ``` 未找到给定列: [
、PK_D365_customeraddress
等]IsDelete
这是正确的,
MSFT_DATASTATE
列不在我的银色增量表中。
参考:https://www.databricks.com/blog/2019/09/24/diving-into-delta-lake-schema-enforcement-evolution.html 尝试使用这个:
# Add the mergeSchema option
loans.write.format("delta") \
.option("mergeSchema", "true") \
.mode("append") \
.save(DELTALAKE_SILVER_PATH)