我在工作流程中遇到了一些模式演变,但我找不到让它发挥作用的方法。
上周,我在 ERP 系统中启用了 5 列,业务需要在表中添加第 6 列。
我的
Deltatable
是在我只有5列时创建的,所以我在Merge into
函数中遇到了问题,现在无法处理额外的列。
如果我不使用具有
foreachBatch
且包含 def
的 Merge into
方法,我可以使用 df.writeStream.format("delta").option("overwriteSchema", True)
这行代码来解决该问题,它会自动在其中添加列。我尝试将它与 foreachBatch
一起使用,但它仍然存在合并问题,原因很明显。
我的代码:
def update_insert(df, epochId, cdm):
deltaTable = DeltaTable.forPath(spark,f"abfss://{container_write}@{storage_write}.dfs.core.windows.net/D365/{cdm}"+"_ao")
deltaTable.alias('table') \
.merge(dfUpdates.alias("newData"),
string
) \
.whenMatchedUpdate(set =
dictionary
) \
.whenNotMatchedInsert(values =
dictionary
) \
.execute()
df.writeStream.format("delta").option("overwriteSchema", True).foreachBatch(lambda df, epochId: update_insert(df, epochId, cdm)).option("checkpointLocation", checkpoint_directory).trigger(availableNow=True).start().awaitTermination()
我最好也想在我的
Deltatable
中包含该额外的列。我怎样才能做到这一点?
mergeSchema
或 overwriteSchema
不适用于 MERGE - 相反,您需要将 Spark conf 属性 spark.databricks.delta.schema.autoMerge.enabled
设置为 true
,如 MERGE
文档中所述。
附注您不需要
.format("delta").option("overwriteSchema", True)
与 foreachBatch
...