我有一个如下所示的数据框。并非所有客户都按每年和每月的组合记录购买。我要遍历并总结过去3个月,6个月和12个月内的购买金额。
我无法插入缺少月份的新行,因为我的数据集非常大。
我尝试过的东西a)将年份和月份转换为日期b)在不起作用的情况下使用总和和大小写。c)使用对行进行迭代来求和,但是我创建的日期是时间戳记,而减去则一直显示错误。
输入
Customer_ID, Purchase_Year, Purchase_Month, Purchases
1 2019 1 4
1 2019 2 6
1 2019 3 4
1 2019 4 2
2 2019 1 2
2 2019 5 3
3 2019 1 9
预期输出
Customer_ID, Purchase_Year, Purchase_Month, Purchases, L3M
1 2019 1 4 4
1 2019 2 6 10
1 2019 3 4 14
1 2019 4 2 12
2 2019 1 2 2
2 2019 5 3 3
3 2019 1 9 9
我的最初尝试有效(但不适用于丢失的行)
sqlContext.sql("""select *, sum(Purchases) over (partition by customer_id
order by Purchase_Year, Purchase_Month
rows between 3 preceding and current row) as total_s
from customer""").show()
[我认为您之前使用时间戳的方法是正确的,因为如果您过去的6个月是2018年,那么如何使用购买年份和购买月份从2019年到2018年。
[您可以将时间戳转换为long,然后在window函数中使用rangebetween遍历所需的天数,对于3个月,您可以使用当天的89天(总共90天)。
from pyspark.sql import functions as F
from pyspark.sql.window import Window
days= lambda i: i * 86400
w=Window().partitionBy("Customer_ID").orderBy("sec").rangeBetween(-days(89),0)
df.withColumn("sec", F.to_timestamp(F.concat("Purchase_Year","Purchase_Month"),"yyyyM").cast("long"))\
.withColumn("L3", F.sum("Purchases").over(w)).orderBy("Customer_ID","Purchase_Month").drop("sec").show()
+-----------+-------------+--------------+---------+---+
|Customer_ID|Purchase_Year|Purchase_Month|Purchases| L3|
+-----------+-------------+--------------+---------+---+
| 1| 2019| 1| 4| 4|
| 1| 2019| 2| 6| 10|
| 1| 2019| 3| 4| 14|
| 1| 2019| 4| 2| 12|
| 2| 2019| 1| 2| 2|
| 2| 2019| 5| 3| 3|
| 3| 2019| 1| 9| 9|
+-----------+-------------+--------------+---------+---+