返回不同日期的最后更新日期的行

问题描述 投票:0回答:1

假设这是我的 PySpark 数据框,按 ("ID", "updated_at") 排序:

身份证 更新于 库存日期 行数
a1 2024-03-25T20:52:36 2024-03-25 1
a1 2024-03-26T11:23:48 2024-03-26 2
a1 2024-03-26T19:25:10 2024-03-26 3
b2 2024-03-24T14:12:20 2024-03-24 4
b2 2024-03-24T20:58:52 2024-03-24 5
b2 2024-03-26T22:24:14 2024-03-26 6
b2 2024-03-28T20:38:38 2024-03-28 7
c3 2024-03-28T15:15:51 2024-03-28 8
c3 2024-03-28T18:11:50 2024-03-28 9
d4 2024-03-24T12:10:15 2024-03-24 10
d4 2024-03-26T21:20:15 2024-03-26 11
d4 2024-03-28T11:55:23 2024-03-28 12
d4 2024-03-28T22:40:34 2024-03-28 13
d4 2024-03-29T11:57:20 2024-03-29 14
d4 2024-03-29T21:48:19 2024-03-29 15

我想返回多天内具有当天最新 ID 的所有行。

换句话说,如果同一个ID在同一天出现多次,它必须只返回最后一次更新的时间。如果它出现在不同的日子,请携带所有出现的情况。 因此,它只是行:1、3、5、6、7、9、10、11、13 和 15。

我已经为最新日期创建了一个窗口函数,但我不知道如何分隔不同的日期。我的表总是仅返回最新的一天。

我怎样才能实现这一目标?

sql pyspark partition days
1个回答
0
投票

我有一个解决方案,我分两步完成:

首先我使用 Windows 函数通过“ID”、“stock_date”进行分区来获取分区内的 row_number

在第二步中,我对第一步中的 df 进行分组,以获得给定分区的最大值

from pyspark.sql.window import Window
from pyspark.sql.functions import row_number, max, col, first

data = [
    ("a1", "2024-03-25T20:52:36", "2024-03-25", 1),
    ("a1", "2024-03-26T11:23:48", "2024-03-26", 2),
    ("a1", "2024-03-26T19:25:10", "2024-03-26", 3),
    ("b2", "2024-03-24T14:12:20", "2024-03-24", 4),
    ("b2", "2024-03-24T20:58:52", "2024-03-24", 5),
    ("b2", "2024-03-26T22:24:14", "2024-03-26", 6),
    ("b2", "2024-03-28T20:38:38", "2024-03-28", 7),
    ("c3", "2024-03-28T15:15:51", "2024-03-28", 8),
    ("c3", "2024-03-28T18:11:50", "2024-03-28", 9),
    ("d4", "2024-03-24T12:10:15", "2024-03-24", 10),
    ("d4", "2024-03-26T21:20:15", "2024-03-26", 11),
    ("d4", "2024-03-28T11:55:23", "2024-03-28", 12),
    ("d4", "2024-03-28T22:40:34", "2024-03-28", 13),
    ("d4", "2024-03-29T11:57:20", "2024-03-29", 14),
    ("d4", "2024-03-29T21:48:19", "2024-03-29", 15)
]

columns = ['ID', 'updated_at', 'stock_date', 'row_num']
df = spark.createDataFrame(data, columns)

windowSpec  = Window.partitionBy("ID", "stock_date").orderBy("stock_date")
groupedDf = df.withColumn("row_number", row_number().over(windowSpec))

groupedDf.groupBy("ID", "stock_date").agg(
    max('updated_at').alias('updated_at'),
    max('row_num').alias('row_num'),
    max("row_number")
).drop("max(row_number)").show()

输出:

+---+----------+-------------------+-------+
| ID|stock_date|         updated_at|row_num|
+---+----------+-------------------+-------+
| a1|2024-03-25|2024-03-25T20:52:36|      1|
| a1|2024-03-26|2024-03-26T19:25:10|      3|
| b2|2024-03-24|2024-03-24T20:58:52|      5|
| b2|2024-03-26|2024-03-26T22:24:14|      6|
| b2|2024-03-28|2024-03-28T20:38:38|      7|
| c3|2024-03-28|2024-03-28T18:11:50|      9|
| d4|2024-03-24|2024-03-24T12:10:15|     10|
| d4|2024-03-26|2024-03-26T21:20:15|     11|
| d4|2024-03-28|2024-03-28T22:40:34|     13|
| d4|2024-03-29|2024-03-29T21:48:19|     15|
+---+----------+-------------------+-------+
© www.soinside.com 2019 - 2024. All rights reserved.