我有一个 pyspark 数据框,如下所示:
Mail sno mail_date date1 present
[email protected] 790 2024-01-01 2024-02-06 yes
[email protected] 790 2023-12-23 2023-01-01
[email protected] 101 2022-02-23
[email protected] 101 2021-01-20 2022-07-09 yes
在最终的数据框中,我需要
sno
的一条记录,其中包含所有最大日期值以及 mail_date 列中对应的 present
的最大日期
所以最终的数据框应该是这样的:
Mail sno mail_date date1 present
[email protected] 790 2024-01-01 2024-02-06 yes
[email protected] 101 2022-02-23 2022-07-09
我有以下代码,
windowSpec=Window.partitionBy('Mail','sno')
df= df.withColumn('max_mail_date', F.max('mail_date').over(windowSpec))\
.withColumn('max_date1', F.max('date1').over(windowSpec))
df1 = df.withColumn('mail_date', F.when(F.col('mail_date').isNotNull(), F.col('max_mail_date')).otherwise(F.col('mail_date')))\
.drop('max_mail_date').dropDuplicates()
在这里,我在当前列中没有得到预期值。 请提出任何更改
如果您创建一个按
'Mail', 'sno'
分区并按 mail_date
排序的窗口,请在此窗口上创建一个排名列,并获取排名最高的 mail_date
以及与其对应的 present
列。然后通过取 df 并按 max date1
分组得到 'Mail', 'sno'
,并将两个结果连接到 'Mail', 'sno'
上。可能有一种更聪明的方法可以在不涉及任何连接的情况下做到这一点,但我不确定如何做。
window_spec = Window.partitionBy('Mail', 'sno').orderBy(F.desc('mail_date'))
df_with_max_date1 = df.groupby(
'Mail','sno'
).agg(
F.max('date1').alias('max_date1'),
)
df.withColumn(
'rank', F.row_number().over(window_spec)
).where(
F.col('rank') == 1
).select(
'Mail','sno','mail_date','present'
).join(
df_with_max_date1,
on=['Mail','sno'],
how='inner'
).select(
'Mail',
'sno',
'mail_date',
'max_date1',
'present'
)
这给出了以下结果:
+-----------+---+----------+----------+-------+
| Mail|sno| mail_date| max_date1|present|
+-----------+---+----------+----------+-------+
|[email protected]|790|2024-01-01|2024-02-06| yes|
|[email protected]|101|2022-02-23|2022-07-09| |
+-----------+---+----------+----------+-------+