假设这是我的 PySpark 数据框,按 ("ID", "updated_at") 排序:
身份证 | 更新于 | 库存日期 | 行数 |
---|---|---|---|
a1 | 2024-03-25T20:52:36 | 2024-03-25 | 1 |
a1 | 2024-03-26T11:23:48 | 2024-03-26 | 2 |
a1 | 2024-03-26T19:25:10 | 2024-03-26 | 3 |
b2 | 2024-03-24T14:12:20 | 2024-03-24 | 4 |
b2 | 2024-03-24T20:58:52 | 2024-03-24 | 5 |
b2 | 2024-03-26T22:24:14 | 2024-03-26 | 6 |
b2 | 2024-03-28T20:38:38 | 2024-03-28 | 7 |
c3 | 2024-03-28T15:15:51 | 2024-03-28 | 8 |
c3 | 2024-03-28T18:11:50 | 2024-03-28 | 9 |
d4 | 2024-03-24T12:10:15 | 2024-03-24 | 10 |
d4 | 2024-03-26T21:20:15 | 2024-03-26 | 11 |
d4 | 2024-03-28T11:55:23 | 2024-03-28 | 12 |
d4 | 2024-03-28T22:40:34 | 2024-03-28 | 13 |
d4 | 2024-03-29T11:57:20 | 2024-03-29 | 14 |
d4 | 2024-03-29T21:48:19 | 2024-03-29 | 15 |
我想返回多天内具有当天最新 ID 的所有行。
换句话说,如果同一个ID在同一天出现多次,它必须只返回最后一次更新的时间。如果它出现在不同的日子,请携带所有出现的情况。 因此,它只是行:1、3、5、6、7、9、10、11、13 和 15。
我已经为最新日期创建了一个窗口函数,但我不知道如何分隔不同的日期。我的表总是仅返回最新的一天。
我怎样才能实现这一目标?
我有一个解决方案,我分两步完成:
首先我使用 Windows 函数通过“ID”、“stock_date”进行分区来获取分区内的 row_number
在第二步中,我对第一步中的 df 进行分组,以获得给定分区的最大值
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number, max, col, first
data = [
("a1", "2024-03-25T20:52:36", "2024-03-25", 1),
("a1", "2024-03-26T11:23:48", "2024-03-26", 2),
("a1", "2024-03-26T19:25:10", "2024-03-26", 3),
("b2", "2024-03-24T14:12:20", "2024-03-24", 4),
("b2", "2024-03-24T20:58:52", "2024-03-24", 5),
("b2", "2024-03-26T22:24:14", "2024-03-26", 6),
("b2", "2024-03-28T20:38:38", "2024-03-28", 7),
("c3", "2024-03-28T15:15:51", "2024-03-28", 8),
("c3", "2024-03-28T18:11:50", "2024-03-28", 9),
("d4", "2024-03-24T12:10:15", "2024-03-24", 10),
("d4", "2024-03-26T21:20:15", "2024-03-26", 11),
("d4", "2024-03-28T11:55:23", "2024-03-28", 12),
("d4", "2024-03-28T22:40:34", "2024-03-28", 13),
("d4", "2024-03-29T11:57:20", "2024-03-29", 14),
("d4", "2024-03-29T21:48:19", "2024-03-29", 15)
]
columns = ['ID', 'updated_at', 'stock_date', 'row_num']
df = spark.createDataFrame(data, columns)
windowSpec = Window.partitionBy("ID", "stock_date").orderBy("stock_date")
groupedDf = df.withColumn("row_number", row_number().over(windowSpec))
groupedDf.groupBy("ID", "stock_date").agg(
max('updated_at').alias('updated_at'),
max('row_num').alias('row_num'),
max("row_number")
).drop("max(row_number)").show()
输出:
+---+----------+-------------------+-------+
| ID|stock_date| updated_at|row_num|
+---+----------+-------------------+-------+
| a1|2024-03-25|2024-03-25T20:52:36| 1|
| a1|2024-03-26|2024-03-26T19:25:10| 3|
| b2|2024-03-24|2024-03-24T20:58:52| 5|
| b2|2024-03-26|2024-03-26T22:24:14| 6|
| b2|2024-03-28|2024-03-28T20:38:38| 7|
| c3|2024-03-28|2024-03-28T18:11:50| 9|
| d4|2024-03-24|2024-03-24T12:10:15| 10|
| d4|2024-03-26|2024-03-26T21:20:15| 11|
| d4|2024-03-28|2024-03-28T22:40:34| 13|
| d4|2024-03-29|2024-03-29T21:48:19| 15|
+---+----------+-------------------+-------+