使用 foreachBatch 进行自动加载器模式演变

问题描述 投票:0回答:1

我在工作流程中遇到了一些模式演变,但我找不到让它发挥作用的方法。

上周,我在 ERP 系统中启用了 5 列,业务需要在表中添加第 6 列。

我的

Deltatable
是在我只有5列时创建的,所以我在
Merge into
函数中遇到了问题,现在无法处理额外的列。

如果我不使用具有

foreachBatch
且包含
def
Merge into
方法,我可以使用
df.writeStream.format("delta").option("overwriteSchema", True)
这行代码来解决该问题,它会自动在其中添加列。我尝试将它与
foreachBatch
一起使用,但它仍然存在合并问题,原因很明显。

我的代码:

    def update_insert(df, epochId, cdm):
    deltaTable = DeltaTable.forPath(spark,f"abfss://{container_write}@{storage_write}.dfs.core.windows.net/D365/{cdm}"+"_ao")
    deltaTable.alias('table') \
  .merge(dfUpdates.alias("newData"),
    string
  ) \
  .whenMatchedUpdate(set =
    dictionary
  ) \
  .whenNotMatchedInsert(values =
    dictionary
  ) \
  .execute()



df.writeStream.format("delta").option("overwriteSchema", True).foreachBatch(lambda df, epochId: update_insert(df, epochId, cdm)).option("checkpointLocation", checkpoint_directory).trigger(availableNow=True).start().awaitTermination()

我最好也想在我的

Deltatable
中包含该额外的列。我怎样才能做到这一点?

apache-spark pyspark azure-databricks spark-structured-streaming delta-lake
1个回答
0
投票

mergeSchema
overwriteSchema
不适用于 MERGE - 相反,您需要将 Spark conf 属性
spark.databricks.delta.schema.autoMerge.enabled
设置为
true
,如
MERGE
文档中所述。

附注您不需要

.format("delta").option("overwriteSchema", True)
foreachBatch
...

© www.soinside.com 2019 - 2024. All rights reserved.