如何删除Delta表中的重复项?

问题描述 投票:0回答:6

有一个从Delta Table中删除数据的功能:

deltaTable = DeltaTable.forPath(spark, "/data/events/")
deltaTable.delete(col("date") < "2017-01-01")

但是还有办法以某种方式删除重复项吗?就像 deltaTable.dropDuplicates()...

我不想将整个表作为数据帧读取,删除重复项,然后再次将其重写到存储中

apache-spark duplicates delta-lake
6个回答
8
投票

如果您有主键(例如 UUID),并且重复项基于特定的列组合(例如 Col1、Col2、Col3),那么您可以使用 ROW_NUMBER() 方法来获取该列的 UUID 列表要删除的重复行。附带说明一下,Delta 表目前没有 ROWID 或序列来自动生成主键。

如果您的重复项基于某个复合键(例如 Col2、Col4、Col7),则用于删除重复项的 ROW_NUMBER() 技巧将不起作用:它将删除该行的所有副本。对于这种情况,您可以结合使用 Delta MERGE 和 Delta Time Travel(版本控制)功能来消除 Delta 表中的重复项。以下是消除完全重复项(所有相应字段具有相同值的行)的步骤:

  1. 获取一个数据框,其中包含在 Delta 表中具有重复项的不同行。 ROW_NUMBER() 函数将在这里为您提供帮助。
  2. 使用 MERGE 操作和 WHEN MATCHED DELETE 删除这些行。请注意,如果同一行有 4 个副本,它将删除所有 4 个副本。我们将在第 5 步中重新添加非重复项。
  3. 使用DESCRIBE HISTORY命令获取当前Delta表之前的版本号。该版本仍然具有所有重复项,而当前版本则没有。
  4. 重复步骤 1,但这次使用 VERSION AS OF 选项来获取包含我们在步骤 2 中删除的不同行的数据帧。
  5. 使用 MEREGE 操作和 WHEN NOT MATCHED INSERT ALL 添加过去具有重复项的不同行。

这是我在 Azure Databricks 上测试的一个 Python 示例,其中的 Delta 表存储在 ADLS 中。

from delta.tables import *

# Step 1
dfTemp = (
  spark.sql("SELECT *, ROW_NUMBER() OVER (PARTITION BY Col2, Col4, Col7 ORDER BY Col9 DESC) rn FROM delta.`abfss://[email protected]/my-path/my-delta-table`")
           ).filter(f.col('rn') > 1).drop('rn').distinct()

# Step 2
deltaTemp = DeltaTable.forPath(spark, "abfss://[email protected]/my-path/my-delta-table")

deltaTemp.alias("main").merge(
    dfTemp.alias("nodups"),
    "main.Col2 = nodups.Col2 AND main.Col4 = nodups.Col4 AND main.Col7 = nodups.Col7").whenMatchedDelete().execute()

现在,获取我们在上面步骤 2 中执行的合并之前 Delta 表的版本号。我在我的 Azure Databricks 笔记本中以 SQL 形式运行此命令:

%sql

-- Step 3
DESCRIBE HISTORY delta.`abfss://[email protected]/my-path/my-delta-table`

假设我当前的 Delta 表版本是 50,而我之前的版本(带有重复项)是 49。您现在可以执行以下剩余步骤:

# Step 4
dfTemp = (
  spark.sql("SELECT *, ROW_NUMBER() OVER (PARTITION BY Col2, Col4, Col7 ORDER BY Col9 DESC) rn FROM delta.`abfss://[email protected]/my-path/my-delta-table` VERSION AS OF 49")
           ).filter(f.col('rn') > 1).drop('rn').distinct()

# Step 5
deltaTemp = DeltaTable.forPath(spark, "abfss://[email protected]/my-path/my-delta-table")

deltaTemp.alias("main").merge(
    dfTemp.alias("nodups"),
    "main.Col2 = nodups.Col2 AND main.Col4 = nodups.Col4 AND main.Col7 = nodups.Col7").whenNotMatchedInsertAll().execute()

当您必须处理部分重复项(主键相同,但其他一些相应字段具有不同值的行)时,您可能最终会编写逻辑来“标记”哪一个重复行保留。逻辑将完全取决于您的特定用例。


5
投票

是的,您可以直接从增量表中删除重复项。使用合并命令。以下命令将仅保留最新记录,其余冗余数据将被删除。

MERGE into [deltatable] as target
USING ( select *, ROW_NUMBER() OVER (Partition By [primary keys] Order By [date] desc) as rn  from [deltatable]) t1 qualify rn> 1 ) as source
ON [merge primary keys and date column between source and target]
WHEN MATCHED THEN DELETE

还有其他多种方法,但它们很耗时。

示例:

MERGE into delta_table as target
USING ( select *, ROW_NUMBER() OVER (Partition By pk1 Order By date1 desc) as rn  from delta_table ) t1 qualify rn> 1 ) as source
ON source.pk1 = target.pk1 and source.date1 = target.date1
WHEN MATCHED THEN DELETE

1
投票

有可能,因为这是Databricks,方法更简单。

CREATE TABLE UnDuped AS
Select DISTINCT * FROM Table

这不适用于普通的 SQL Server 查询。 Databricks 使用 parquet 表,这是一种列式方法,因此区分重复项与 SQL Server 中的 RBAR(逐行)方式不同。我把功劳归功于 Vereesh,但如果这么简单,可能值得按照上面的方法进行,而不是使用 row_number() 函数或“匹配时删除”。但他的方法提供了看起来不错的替代方案。 或者

with cte as(
Select col1,col2,col3,etc
,row_number()over(partition by col1,col2,col3,etc order by col1)rowno 
from table) 
Delete from cte where rowno>1

确保每一列都包含在 row_number() 分区中,它将找到重复项,删除那些 rowno 大于 1 的列,然后它们就消失了。这里更重要的是 order by。例如,如果您希望最后一个作为重复项输入的文件成为删除的项目,则可以将日期从分区中删除并按日期排序,这将为您提供所有重复项并按日期(或本例中的日期 desc)对它们进行排序。 case) rowno 将指向最新日期,其余的将消失。


0
投票

我们假设主键是 a,b,c 列,时间戳列是 d,我们将选择最新版本的数据

MERGE into delta.`/mnt/lei/dupTab` as target
USING (
      with t as(
      select a,b,c, d,ROW_NUMBER() OVER (Partition By a,b,c Order By d desc) as rn  from delta.`/mnt/lei/dupTab`
      )
      select * from t where rn > 1
 
) 
as source
ON source.a=target.a and source.b=target.b and source.c=target.c and source.d=target.d
WHEN MATCHED THEN DELETE

0
投票

不是最有效但最简单的解决方案。

  1. 从增量表中读取可能包含重复项的行子集,删除重复项并将其暂时存储在某处。
import pyspark.sql.functions as F
from delta.tables import DeltaTable

df = spark.read.format("delta").load("schema.table_name")

# let's limit only to records that may contain duplicates
# for instance: all records that have user_id is NULL
df = df.filter(F.isnull('user_id'))

# provide a subset of columns to identify duplicates
df = df.dropDuplicates(["birthday", "name"])

# let's store this somewhere, e.g. s3 folder
df.write.parquet("s3://bucket/temp_folder")
  1. 从增量表中删除包含重复项的完整记录子集(sql 代码):
DELETE FROM schema.table_name WHERE user_id IS NULL
  1. 将缓存数据合并回 Delta 表:
# our cached records are on s3, so:
df = spark.read.parquet("s3://bucket/temp_folder")

# merge into Delta Table providing a simple merge condition:
delta_table = DeltaTable.forName(spark, "schema.table_name")
delta_table.alias("t").merge(
    df.alias("b"),
    "b.user_id = t.user_id AND b.birthday = t.birthday AND b.name = t.name"
).whenNotMatchedInsertAll().execute()

希望这能给一些想法。可能可以简化并缓存在内存中而不是 S3 中,我还不太熟悉 Spark。


0
投票

我编写了一个 python 函数来对增量表进行重复数据删除。它使用 Spark sql。

def dedup_delta_table(table='mytable',pks=['id','line'],orderby='order by update_date desc', db='clarity',userdb='userdb'):
    ### given a table remove rows with duplicated primary key
    #table, string, table name
    #pks, primary keys  or unqiue keys of the table,
    #orderby: sql order by clause. To dedup, for rows with same primary keys for , after order by specified column(s) , take first row
    #db,database where the table resides 
    #userdb,a database to save the dedup rows temporarily

    spark.sql(f"""use    {db}  """)

    ###sql query parts
    groupby_str=','.join(pks)
    joinOn_str=' AND '.join([f'a.{col} = b.{col}' for col in pks])
    allcols=spark.sql(f"""select *   from  {table}   limit 1""").columns
    allcols_str=','.join(allcols)

    ###1.extract dup rows and dedup them:  deduped rows
    spark.sql(f"""
    select {allcols_str}
    from(
        select *
        ,row_number()over(partition by {groupby_str} {orderby} ) rn   
        ,count(*)over(partition by {groupby_str}) cnt
        from   {table} 
        )c
    where cnt>=2 and rn=1 
    """).write.saveAsTable(f"{userdb}.{table}_dedup_rows_tb",mode='Overwrite')
    
    ##2.delete all dup rows by merge
    spark.sql(f"""
    MERGE INTO {table} a
    USING {userdb}.{table}_dedup_rows_tb b
    on {joinOn_str}
    WHEN MATCHED THEN DELETE
    """).toPandas()

    ##3.insert deduped rows
    spark.sql(f"""
    INSERT INTO {table}
    SELECT * 
    from {userdb}.{table}_dedup_rows_tb
    """)
    #drop the temporary table
    spark.sql(f"""drop table if exists   {userdb}.{table}_dedup_rows_tb    """
© www.soinside.com 2019 - 2024. All rights reserved.