在聚合期间计算最长的日期条纹 - 使用 pyspark

问题描述 投票:0回答:1

想象一张桌子:

人员ID 日期 已完成锻炼
A 2001年1月31日 1
A 2001年1月2日 1
A 2001年2月2日 1
A 2001年3月2日 0
A 2001年4月2日 1
B 2001年2月2日 1

我想创建一个 pyspark 聚合函数,它将计算一个人连续锻炼了多少天。如果一个人有多个连续记录 - 选择最长的一个。

预期输出:

人员ID 已完成锻炼
A 3
B 1

由于我没有找到任何使用 pyspark 的解决方案 - 我尝试采用 pandas 方法。但未能将其翻译成pyspark。

python dataframe pyspark time-series aws-glue
1个回答
0
投票

逐步解决方案

创建一个窗口规范,按

PersonID
对 DataFrame 进行分组并按
Date
排序,然后使用
to_date
函数将字符串解析为日期类型。

W = Window.partitionBy('PersonID').orderBy('Date')
df1 = df.withColumn('Date', F.to_date('Date', format='dd-MM-yyyy'))

# df1.show()
# +--------+----------+--------------+
# |PersonID|      Date|HasDoneWorkout|
# +--------+----------+--------------+
# |       A|2001-01-31|             1|
# |       A|2001-02-01|             1|
# |       A|2001-02-02|             1|
# |       A|2001-02-03|             0|
# |       A|2001-02-04|             1|
# |       B|2001-02-02|             1|
# +--------+----------+--------------+

计算前一行和当前行中日期之间的差异,以标记日期连续的行

diff = F.datediff('Date', F.lag('Date').over(W))
df1 = df1.withColumn('is_consecutive_day', F.coalesce(diff, F.lit(0)) == 1)

# df1.show()
# +--------+----------+--------------+------------------+
# |PersonID|      Date|HasDoneWorkout|is_consecutive_day|
# +--------+----------+--------------+------------------+
# |       A|2001-01-31|             1|             false|
# |       A|2001-02-01|             1|              true|
# |       A|2001-02-02|             1|              true|
# |       A|2001-02-03|             0|              true|
# |       A|2001-02-04|             1|              true|
# |       B|2001-02-02|             1|             false|
# +--------+----------+--------------+------------------+

创建一个布尔列来标识具有连续日期的行以及该人完成锻炼的行。

df1 = df1.withColumn('is_workout_on_consecutive_day', F.col('is_consecutive_day') & (F.col('HasDoneWorkout') == 1))

# df1.show()
# +--------+----------+--------------+------------------+-----------------------------+
# |PersonID|      Date|HasDoneWorkout|is_consecutive_day|is_workout_on_consecutive_day|
# +--------+----------+--------------+------------------+-----------------------------+
# |       A|2001-01-31|             1|             false|                        false|
# |       A|2001-02-01|             1|              true|                         true|
# |       A|2001-02-02|             1|              true|                         true|
# |       A|2001-02-03|             0|              true|                        false|
# |       A|2001-02-04|             1|              true|                         true|
# |       B|2001-02-02|             1|             false|                        false|
# +--------+----------+--------------+------------------+-----------------------------+

倒置条件的累积总和

is_workout_on_consecutive_day
以区分人们连续进行锻炼的不同行组

df1 = df1.withColumn('groups', F.sum((~F.col('is_workout_on_consecutive_day')).cast('int')).over(W))


# df1.show()
# +--------+----------+--------------+------------------+-----------------------------+------+
# |PersonID|      Date|HasDoneWorkout|is_consecutive_day|is_workout_on_consecutive_day|groups|
# +--------+----------+--------------+------------------+-----------------------------+------+
# |       A|2001-01-31|             1|             false|                        false|     1|
# |       A|2001-02-01|             1|              true|                         true|     1|
# |       A|2001-02-02|             1|              true|                         true|     1|
# |       A|2001-02-03|             0|              true|                        false|     2|
# |       A|2001-02-04|             1|              true|                         true|     2|
# |       B|2001-02-02|             1|             false|                        false|     1|
# +--------+----------+--------------+------------------+-----------------------------+------+

PersonID
groups
对数据帧进行分组,并将
HasDoneWorkout
sum
聚合以获取所有连续条纹的计数

df1 = df1.groupBy('PersonID', 'groups').agg(F.sum('HasDoneWorkout').alias('streaks'))

# df1.show()
# +--------+------+-------+
# |PersonID|groups|streaks|
# +--------+------+-------+
# |       A|     1|      3|
# |       A|     2|      1|
# |       B|     1|      1|
# +--------+------+-------+

再次按

PersonID
对数据帧进行分组并聚合以找到最大连续条纹

df1 = df1.groupBy('PersonID').agg(F.max('streaks').alias('streaks'))

# df1.show()
# +--------+-------+
# |PersonID|streaks|
# +--------+-------+
# |       A|      3|
# |       B|      1|
# +--------+-------+
© www.soinside.com 2019 - 2024. All rights reserved.