想象一张桌子:
人员ID | 日期 | 已完成锻炼 |
---|---|---|
A | 2001年1月31日 | 1 |
A | 2001年1月2日 | 1 |
A | 2001年2月2日 | 1 |
A | 2001年3月2日 | 0 |
A | 2001年4月2日 | 1 |
B | 2001年2月2日 | 1 |
我想创建一个 pyspark 聚合函数,它将计算一个人连续锻炼了多少天。如果一个人有多个连续记录 - 选择最长的一个。
预期输出:
人员ID | 已完成锻炼 |
---|---|
A | 3 |
B | 1 |
由于我没有找到任何使用 pyspark 的解决方案 - 我尝试采用 pandas 方法。但未能将其翻译成pyspark。
创建一个窗口规范,按
PersonID
对 DataFrame 进行分组并按 Date
排序,然后使用 to_date
函数将字符串解析为日期类型。
W = Window.partitionBy('PersonID').orderBy('Date')
df1 = df.withColumn('Date', F.to_date('Date', format='dd-MM-yyyy'))
# df1.show()
# +--------+----------+--------------+
# |PersonID| Date|HasDoneWorkout|
# +--------+----------+--------------+
# | A|2001-01-31| 1|
# | A|2001-02-01| 1|
# | A|2001-02-02| 1|
# | A|2001-02-03| 0|
# | A|2001-02-04| 1|
# | B|2001-02-02| 1|
# +--------+----------+--------------+
计算前一行和当前行中日期之间的差异,以标记日期连续的行
diff = F.datediff('Date', F.lag('Date').over(W))
df1 = df1.withColumn('is_consecutive_day', F.coalesce(diff, F.lit(0)) == 1)
# df1.show()
# +--------+----------+--------------+------------------+
# |PersonID| Date|HasDoneWorkout|is_consecutive_day|
# +--------+----------+--------------+------------------+
# | A|2001-01-31| 1| false|
# | A|2001-02-01| 1| true|
# | A|2001-02-02| 1| true|
# | A|2001-02-03| 0| true|
# | A|2001-02-04| 1| true|
# | B|2001-02-02| 1| false|
# +--------+----------+--------------+------------------+
创建一个布尔列来标识具有连续日期的行以及该人完成锻炼的行。
df1 = df1.withColumn('is_workout_on_consecutive_day', F.col('is_consecutive_day') & (F.col('HasDoneWorkout') == 1))
# df1.show()
# +--------+----------+--------------+------------------+-----------------------------+
# |PersonID| Date|HasDoneWorkout|is_consecutive_day|is_workout_on_consecutive_day|
# +--------+----------+--------------+------------------+-----------------------------+
# | A|2001-01-31| 1| false| false|
# | A|2001-02-01| 1| true| true|
# | A|2001-02-02| 1| true| true|
# | A|2001-02-03| 0| true| false|
# | A|2001-02-04| 1| true| true|
# | B|2001-02-02| 1| false| false|
# +--------+----------+--------------+------------------+-----------------------------+
倒置条件的累积总和
is_workout_on_consecutive_day
以区分人们连续进行锻炼的不同行组
df1 = df1.withColumn('groups', F.sum((~F.col('is_workout_on_consecutive_day')).cast('int')).over(W))
# df1.show()
# +--------+----------+--------------+------------------+-----------------------------+------+
# |PersonID| Date|HasDoneWorkout|is_consecutive_day|is_workout_on_consecutive_day|groups|
# +--------+----------+--------------+------------------+-----------------------------+------+
# | A|2001-01-31| 1| false| false| 1|
# | A|2001-02-01| 1| true| true| 1|
# | A|2001-02-02| 1| true| true| 1|
# | A|2001-02-03| 0| true| false| 2|
# | A|2001-02-04| 1| true| true| 2|
# | B|2001-02-02| 1| false| false| 1|
# +--------+----------+--------------+------------------+-----------------------------+------+
按
PersonID
和 groups
对数据帧进行分组,并将 HasDoneWorkout
与 sum
聚合以获取所有连续条纹的计数
df1 = df1.groupBy('PersonID', 'groups').agg(F.sum('HasDoneWorkout').alias('streaks'))
# df1.show()
# +--------+------+-------+
# |PersonID|groups|streaks|
# +--------+------+-------+
# | A| 1| 3|
# | A| 2| 1|
# | B| 1| 1|
# +--------+------+-------+
再次按
PersonID
对数据帧进行分组并聚合以找到最大连续条纹
df1 = df1.groupBy('PersonID').agg(F.max('streaks').alias('streaks'))
# df1.show()
# +--------+-------+
# |PersonID|streaks|
# +--------+-------+
# | A| 3|
# | B| 1|
# +--------+-------+