在具有空值的列上使用最大值/最小值

问题描述 投票:0回答:1

我有以下数据框

df
-

+---+-------------------+----+
| id|               date|site|
+---+-------------------+----+
|100|2020-03-24 00:00:00|   a|
|100|2019-08-30 00:00:00|   a|
|100|2020-03-24 00:00:00|   b|
|101|2019-12-20 00:00:00|NULL|
|101|2019-12-20 00:00:00|   a|
|102|2019-04-14 00:00:00|NULL| 
|103|2019-09-28 00:00:00|   c|
+---+-------------------+----+

其中日期是

TimestampType
,站点是字符串。

我的目标是使用以下逻辑删除具有相同 id 的重复行: 对于每个 id,仅保留具有最新日期的行(日期不能为空)。 如果有两行或更多行包含该日期,请选择具有非空站点的任意行(站点可以为空),因此看起来最简单的方法是取最大值或最小值。

我设法使用以下代码仅保留最新日期(取自此处)-

w = Window.partitionBy('id')
df2 = df.withColumn('maxDate', f.max('date').over(w)) \
    .where(f.col('date') == f.col('maxDate')) \
    .drop('maxDate')
    

结果为

df2
-

+---+-------------------+----+
| id|               date|site|
+---+-------------------+----+
|100|2020-03-24 00:00:00|   a|
|100|2020-03-24 00:00:00|   b|
|101|2019-12-20 00:00:00|NULL|
|101|2019-12-20 00:00:00|   a|
|102|2019-04-14 00:00:00|NULL|
|103|2019-09-28 00:00:00|   c|
+---+-------------------+----+

然后我尝试在该网站上做类似的事情 -

w = Window.partitionBy('id')
df3 = df2.withColumn('maxSite', f.max('site').over(w)) \
    .where(f.col('site') == f.col('maxSite')) \
    .drop('maxSite')
    

但是结果是

df3
-

+---+-------------------+----+
| id|               date|site|
+---+-------------------+----+
|100|2020-03-24 00:00:00|   b|
|101|2019-12-20 00:00:00|   a|
|103|2019-09-28 00:00:00|   c|
+---+-------------------+----+

id 102 丢失,看起来 max(或 min)会跳过空值,并且由于 id 102 只有一行具有空值,因此找不到它。
我设法通过在第一个数据帧和最后一个数据帧之间进行左反连接来克服这个问题,所以我得到了所有只有空站点的 id,然后使用 union 来统一所有结果 -

df_left_anti = df.join(df3, df['id'] == df3['id'], 'left_anti')
df_all = df3.union(df_left_anti).orderBy('id')

我的问题是 - 这是正确且有效的做法吗?我可以使用带空值的最大值/最小值吗?

join pyspark
1个回答
0
投票

我不会使用 max,而是简单地使用

row_number
。您计算每个 id 的 row_number、按日期和站点排序。如果您使用正常排序,它将相当于“最小值”,但在这里,我使用 order desc 相当于“最大值”。您可以使用方法
desc_nulls_last
而不是简单的
desc

来确保不优先考虑空值

然后,您只需选择“第一”行(每个 id 总是有第一行)。

from pyspark.sql import functions as F, Window as W

df.withColumn(
    "rnk",
    F.row_number().over(
        W.partitionBy("id").orderBy(
            F.col("site").desc_nulls_last(),
            F.col("date").desc(),
        )
    ),
).where(F.col("rnk") == 1).drop("rnk")
+---+----------+----+
| id|      date|site|
+---+----------+----+
|100|2020-03-24|   b|
|101|2019-12-20|   a|
|102|2019-04-14|null|
|103|2019-09-28|   c|
+---+----------+----+
© www.soinside.com 2019 - 2024. All rights reserved.