Pyspark - 通过数据帧中的重复数据删除事务保留每个主键的最新记录

问题描述 投票:0回答:1

我正在研究数据宠物项目,探索增量格式和表格。 初始加载文件包含 Op,其操作标志为“I”,在 pyspark 中读取并保存为 delta 格式。

后续文件被更改数据捕获, 当我尝试将其加载到 pyspark 数据框中并执行重复数据删除操作时,我遇到了问题。

from pyspark.sql.functions import to_timestamp

# Sample data as a list of tuples
data = [
    ("I", "2024-03-22 22:49:56.000000", 71, 104.75),
    ("U", "2024-03-22 22:50:00.000000", 72, 114.75),
    ("I", "2024-03-22 22:49:56.000000", 73, 10.00),
    ("U", "2024-03-22 22:50:56.000000", 73, 20.00),
    ("U", "2024-03-22 22:51:56.000000", 73, 30.00),
    ("I", "2024-03-22 22:55:57.000000", 74, 30.00),
    ("U", "2024-03-22 22:49:56.000000", 75, 40.00),
    ("U", "2024-03-22 22:52:56.000000", 75, 50.00),
    ("U", "2024-03-22 22:57:56.000000", 75, 60.00)
]

# Define the schema 
schema = StructType([
    StructField("Op", StringType(), True),
    StructField("sourceRecordTime", StringType(), True),
    StructField("id", IntegerType(), True),
    StructField("amount", DoubleType(), True)
])

# Create the DataFrame
df = spark.createDataFrame(data, schema)
df = df.withColumn("sourceRecordTime", to_timestamp(df["sourceRecordTime"], "yyyy-MM-dd HH:mm:ss.SSSSSS"))

# Conditions
# If the record for an id has only 'I' operation, retain that record.
# If the latest record for an id has Single 'U' operation, retain that record.
# If the latest record for an id has multiple 'U' operation, retain that latest record based on sourceRecordTime.
# If the latest record for an id has both 'I' and 'U' operation, retain the latest record with 'U' flag and change 'U' to 'I' in the final for that record

sort_order = Window.partitionBy(col('id')).orderBy(col('sourceRecordTime').desc())
update_df = df.withColumn("rec_val", row_number().over(sort_order)).filter("rec_val=1").drop("rec_val")
update_df.show()``

Ouput:
+---+-------------------+---+------+
| Op|   sourceRecordTime| id|amount|
+---+-------------------+---+------+
|  I|2024-03-22 22:49:56| 71|104.75|
|  U|2024-03-22 22:50:00| 72|114.75|
|  U|2024-03-22 22:51:56| 73|  30.0|
|  I|2024-03-22 22:55:57| 74|  30.0|
|  U|2024-03-22 22:57:56| 75|  60.0|
+---+-------------------+---+------+

Expected Output:
+---+-------------------+---+------+
| Op|   sourceRecordTime| id|amount|
+---+-------------------+---+------+
|  I|2024-03-22 22:49:56| 71|104.75|
|  U|2024-03-22 22:50:00| 72|114.75|
|  I|2024-03-22 22:51:56| 73|  30.0|
|  I|2024-03-22 22:55:57| 74|  30.0|
|  U|2024-03-22 22:57:56| 75|  60.0|
+---+-------------------+---+------+
dataframe apache-spark pyspark delta
1个回答
0
投票

您已经快完成了,因为每个 id 的最后一条记录已经满足前 3 个条件。您可以通过创建另一个窗口、仅按 id 分区并获取 Op 列的

min
(PySpark 的最小值,即
pyspark.sql.functions.min
)来覆盖最后一个条件。这与排序字符串“I”时一样 < "U".

update_df = (
    df
    .withColumn("Op", min("Op").over(Window.partitionBy(col("id"))))
    .withColumn("rec_val", row_number().over(sort_order)).filter("rec_val=1")
    .drop("rec_val")
)

+---+-------------------+---+------+
| Op|   sourceRecordTime| id|amount|
+---+-------------------+---+------+
|  I|2024-03-22 22:49:56| 71|104.75|
|  U|2024-03-22 22:50:00| 72|114.75|
|  I|2024-03-22 22:51:56| 73|  30.0|
|  I|2024-03-22 22:55:57| 74|  30.0|
|  U|2024-03-22 22:57:56| 75|  60.0|
+---+-------------------+---+------+
© www.soinside.com 2019 - 2024. All rights reserved.