从 Pyspark 中的数据帧插入或更新增量表

问题描述 投票:0回答:3

我当前有一个 pyspark 数据框,我最初使用下面的代码创建了一个增量表 -

df.write.format("delta").saveAsTable("events")

现在,由于上面的数据框根据我的要求每天填充数据,因此为了将新记录附加到增量表中,我使用了以下语法 -

df.write.format("delta").mode("append").saveAsTable("events")

现在我在数据块和集群中完成了这一切。我想知道如何在 python 中编写通用 pyspark 代码,如果增量表不存在,则创建增量表,如果增量表存在,则追加记录。我想做这件事,因为如果我将我的 python 包给某人,他们不会在其环境中具有相同的增量表,因此应该从代码动态创建它。

apache-spark pyspark delta-lake
3个回答
5
投票

如果您还没有Delta表,那么当您使用

append
模式时将会创建它。因此,您不需要编写任何特殊代码来处理表尚不存在和退出的情况。

附注仅当您执行合并到表中而不是追加时,才需要这样的代码。在这种情况下,代码将如下所示:

if table_exists:
  do_merge
else:
  df.write....

附注这是该模式的通用实现


3
投票
最终有两个可用的 Spark 操作

  • saveAsTable
    :- 使用当前 DataFrame 创建或替换表(如果存在或不存在)
  • insertInto
    :- 如果表存在并基于模式(“覆盖”或“追加”)执行操作,则成功。它要求该表在数据库中可用。

.saveAsTable("events")

 基本上每次调用它时都会重写该表。这意味着,即使您之前有一个表存在或不存在,它也会用当前的 DataFrame 值替换该表。相反,您可以执行以下操作以确保安全:

第 1 步:创建表,无论表是否存在。如果存在,请从表中删除数据并附加新的数据框记录,否则创建表并附加数据。

df.createOrReplaceTempView('df_table') spark.sql("create table IF NOT EXISTS table_name using delta select * from df_table where 1=2") df.write.format("delta").mode("append").insertInto("events")
因此,每次它都会检查表是否可用,否则它将创建表并进入下一步。否则,如果表可用,则将数据追加到表中。


1
投票
使用以下语法。

updates = df dest = DeltaTable.forPath(spark, path) dest.alias("events").merge( updates.alias("updates"), 'events.Id = updates.Id and \ events.Type = updates.Type' ).whenMatchedUpdateAll( ).whenNotMatchedInsertAll( ).execute()
    
© www.soinside.com 2019 - 2024. All rights reserved.