有一个从Delta Table中删除数据的功能:
deltaTable = DeltaTable.forPath(spark, "/data/events/")
deltaTable.delete(col("date") < "2017-01-01")
但是还有办法以某种方式删除重复项吗?就像 deltaTable.dropDuplicates()...
我不想将整个表作为数据帧读取,删除重复项,然后再次将其重写到存储中
如果您有主键(例如 UUID),并且重复项基于特定的列组合(例如 Col1、Col2、Col3),那么您可以使用 ROW_NUMBER() 方法来获取该列的 UUID 列表要删除的重复行。附带说明一下,Delta 表目前没有 ROWID 或序列来自动生成主键。
如果您的重复项基于某个复合键(例如 Col2、Col4、Col7),则用于删除重复项的 ROW_NUMBER() 技巧将不起作用:它将删除该行的所有副本。对于这种情况,您可以结合使用 Delta MERGE 和 Delta Time Travel(版本控制)功能来消除 Delta 表中的重复项。以下是消除完全重复项(所有相应字段具有相同值的行)的步骤:
这是我在 Azure Databricks 上测试的一个 Python 示例,其中的 Delta 表存储在 ADLS 中。
from delta.tables import *
# Step 1
dfTemp = (
spark.sql("SELECT *, ROW_NUMBER() OVER (PARTITION BY Col2, Col4, Col7 ORDER BY Col9 DESC) rn FROM delta.`abfss://[email protected]/my-path/my-delta-table`")
).filter(f.col('rn') > 1).drop('rn').distinct()
# Step 2
deltaTemp = DeltaTable.forPath(spark, "abfss://[email protected]/my-path/my-delta-table")
deltaTemp.alias("main").merge(
dfTemp.alias("nodups"),
"main.Col2 = nodups.Col2 AND main.Col4 = nodups.Col4 AND main.Col7 = nodups.Col7").whenMatchedDelete().execute()
现在,获取我们在上面步骤 2 中执行的合并之前 Delta 表的版本号。我在我的 Azure Databricks 笔记本中以 SQL 形式运行此命令:
%sql
-- Step 3
DESCRIBE HISTORY delta.`abfss://[email protected]/my-path/my-delta-table`
假设我当前的 Delta 表版本是 50,而我之前的版本(带有重复项)是 49。您现在可以执行以下剩余步骤:
# Step 4
dfTemp = (
spark.sql("SELECT *, ROW_NUMBER() OVER (PARTITION BY Col2, Col4, Col7 ORDER BY Col9 DESC) rn FROM delta.`abfss://[email protected]/my-path/my-delta-table` VERSION AS OF 49")
).filter(f.col('rn') > 1).drop('rn').distinct()
# Step 5
deltaTemp = DeltaTable.forPath(spark, "abfss://[email protected]/my-path/my-delta-table")
deltaTemp.alias("main").merge(
dfTemp.alias("nodups"),
"main.Col2 = nodups.Col2 AND main.Col4 = nodups.Col4 AND main.Col7 = nodups.Col7").whenNotMatchedInsertAll().execute()
当您必须处理部分重复项(主键相同,但其他一些相应字段具有不同值的行)时,您可能最终会编写逻辑来“标记”哪一个重复行保留。逻辑将完全取决于您的特定用例。
是的,您可以直接从增量表中删除重复项。使用合并命令。以下命令将仅保留最新记录,其余冗余数据将被删除。
MERGE into [deltatable] as target
USING ( select *, ROW_NUMBER() OVER (Partition By [primary keys] Order By [date] desc) as rn from [deltatable]) t1 qualify rn> 1 ) as source
ON [merge primary keys and date column between source and target]
WHEN MATCHED THEN DELETE
还有其他多种方法,但它们很耗时。
示例:
MERGE into delta_table as target
USING ( select *, ROW_NUMBER() OVER (Partition By pk1 Order By date1 desc) as rn from delta_table ) t1 qualify rn> 1 ) as source
ON source.pk1 = target.pk1 and source.date1 = target.date1
WHEN MATCHED THEN DELETE
有可能,因为这是Databricks,方法更简单。
CREATE TABLE UnDuped AS
Select DISTINCT * FROM Table
这不适用于普通的 SQL Server 查询。 Databricks 使用 parquet 表,这是一种列式方法,因此区分重复项与 SQL Server 中的 RBAR(逐行)方式不同。我把功劳归功于 Vereesh,但如果这么简单,可能值得按照上面的方法进行,而不是使用 row_number() 函数或“匹配时删除”。但他的方法提供了看起来不错的替代方案。 或者
with cte as(
Select col1,col2,col3,etc
,row_number()over(partition by col1,col2,col3,etc order by col1)rowno
from table)
Delete from cte where rowno>1
确保每一列都包含在 row_number() 分区中,它将找到重复项,删除那些 rowno 大于 1 的列,然后它们就消失了。这里更重要的是 order by。例如,如果您希望最后一个作为重复项输入的文件成为删除的项目,则可以将日期从分区中删除并按日期排序,这将为您提供所有重复项并按日期(或本例中的日期 desc)对它们进行排序。 case) rowno 将指向最新日期,其余的将消失。
我们假设主键是 a,b,c 列,时间戳列是 d,我们将选择最新版本的数据
MERGE into delta.`/mnt/lei/dupTab` as target
USING (
with t as(
select a,b,c, d,ROW_NUMBER() OVER (Partition By a,b,c Order By d desc) as rn from delta.`/mnt/lei/dupTab`
)
select * from t where rn > 1
)
as source
ON source.a=target.a and source.b=target.b and source.c=target.c and source.d=target.d
WHEN MATCHED THEN DELETE
不是最有效但最简单的解决方案。
import pyspark.sql.functions as F
from delta.tables import DeltaTable
df = spark.read.format("delta").load("schema.table_name")
# let's limit only to records that may contain duplicates
# for instance: all records that have user_id is NULL
df = df.filter(F.isnull('user_id'))
# provide a subset of columns to identify duplicates
df = df.dropDuplicates(["birthday", "name"])
# let's store this somewhere, e.g. s3 folder
df.write.parquet("s3://bucket/temp_folder")
DELETE FROM schema.table_name WHERE user_id IS NULL
# our cached records are on s3, so:
df = spark.read.parquet("s3://bucket/temp_folder")
# merge into Delta Table providing a simple merge condition:
delta_table = DeltaTable.forName(spark, "schema.table_name")
delta_table.alias("t").merge(
df.alias("b"),
"b.user_id = t.user_id AND b.birthday = t.birthday AND b.name = t.name"
).whenNotMatchedInsertAll().execute()
希望这能给一些想法。可能可以简化并缓存在内存中而不是 S3 中,我还不太熟悉 Spark。
我编写了一个 python 函数来对增量表进行重复数据删除。它使用 Spark sql。
def dedup_delta_table(table='mytable',pks=['id','line'],orderby='order by update_date desc', db='clarity',userdb='userdb'):
### given a table remove rows with duplicated primary key
#table, string, table name
#pks, primary keys or unqiue keys of the table,
#orderby: sql order by clause. To dedup, for rows with same primary keys for , after order by specified column(s) , take first row
#db,database where the table resides
#userdb,a database to save the dedup rows temporarily
spark.sql(f"""use {db} """)
###sql query parts
groupby_str=','.join(pks)
joinOn_str=' AND '.join([f'a.{col} = b.{col}' for col in pks])
allcols=spark.sql(f"""select * from {table} limit 1""").columns
allcols_str=','.join(allcols)
###1.extract dup rows and dedup them: deduped rows
spark.sql(f"""
select {allcols_str}
from(
select *
,row_number()over(partition by {groupby_str} {orderby} ) rn
,count(*)over(partition by {groupby_str}) cnt
from {table}
)c
where cnt>=2 and rn=1
""").write.saveAsTable(f"{userdb}.{table}_dedup_rows_tb",mode='Overwrite')
##2.delete all dup rows by merge
spark.sql(f"""
MERGE INTO {table} a
USING {userdb}.{table}_dedup_rows_tb b
on {joinOn_str}
WHEN MATCHED THEN DELETE
""").toPandas()
##3.insert deduped rows
spark.sql(f"""
INSERT INTO {table}
SELECT *
from {userdb}.{table}_dedup_rows_tb
""")
#drop the temporary table
spark.sql(f"""drop table if exists {userdb}.{table}_dedup_rows_tb """