我当前有一个 pyspark 数据框,我最初使用下面的代码创建了一个增量表 -
df.write.format("delta").saveAsTable("events")
现在,由于上面的数据框根据我的要求每天填充数据,因此为了将新记录附加到增量表中,我使用了以下语法 -
df.write.format("delta").mode("append").saveAsTable("events")
现在我在数据块和集群中完成了这一切。我想知道如何在 python 中编写通用 pyspark 代码,如果增量表不存在,则创建增量表,如果增量表存在,则追加记录。我想做这件事,因为如果我将我的 python 包给某人,他们不会在其环境中具有相同的增量表,因此应该从代码动态创建它。
如果您还没有Delta表,那么当您使用
append
模式时将会创建它。因此,您不需要编写任何特殊代码来处理表尚不存在和退出的情况。
附注仅当您执行合并到表中而不是追加时,才需要这样的代码。在这种情况下,代码将如下所示:
if table_exists:
do_merge
else:
df.write....
附注这是该模式的通用实现
saveAsTable
:- 使用当前 DataFrame 创建或替换表(如果存在或不存在)
insertInto
:- 如果表存在并基于模式(“覆盖”或“追加”)执行操作,则成功。它要求该表在数据库中可用。
.saveAsTable("events")
基本上每次调用它时都会重写该表。这意味着,即使您之前有一个表存在或不存在,它也会用当前的 DataFrame 值替换该表。相反,您可以执行以下操作以确保安全:第 1 步:创建表,无论表是否存在。如果存在,请从表中删除数据并附加新的数据框记录,否则创建表并附加数据。
df.createOrReplaceTempView('df_table')
spark.sql("create table IF NOT EXISTS table_name using delta select * from df_table where 1=2")
df.write.format("delta").mode("append").insertInto("events")
因此,每次它都会检查表是否可用,否则它将创建表并进入下一步。否则,如果表可用,则将数据追加到表中。
updates = df
dest = DeltaTable.forPath(spark, path)
dest.alias("events").merge(
updates.alias("updates"),
'events.Id = updates.Id and \
events.Type = updates.Type'
).whenMatchedUpdateAll(
).whenNotMatchedInsertAll(
).execute()