Pyspark;在数据框上迭代以基于日期过滤器求和组

问题描述 投票:1回答:1

我有一个如下所示的数据框。并非所有客户都按每年和每月的组合记录购买。我要遍历并总结过去3个月,6个月和12个月内的购买金额。

我无法插入缺少月份的新行,因为我的数据集非常大。

我尝试过的东西a)将年份和月份转换为日期b)在不起作用的情况下使用总和和大小写。c)使用对行进行迭代来求和,但是我创建的日期是时间戳记,而减去则一直显示错误。

输入

Customer_ID, Purchase_Year, Purchase_Month, Purchases
1 2019 1 4
1 2019 2 6
1 2019 3 4
1 2019 4 2
2 2019 1 2
2 2019 5 3
3 2019 1 9

预期输出

Customer_ID, Purchase_Year, Purchase_Month, Purchases, L3M
1 2019 1 4 4
1 2019 2 6 10
1 2019 3 4 14
1 2019 4 2 12
2 2019 1 2 2
2 2019 5 3 3
3 2019 1 9 9

我的最初尝试有效(但不适用于丢失的行)

sqlContext.sql("""select *, sum(Purchases) over (partition by customer_id
                          order by Purchase_Year, Purchase_Month
                           rows between 3 preceding and current row) as total_s
 from customer""").show()
python loops pyspark pyspark-sql
1个回答
1
投票

[我认为您之前使用时间戳的方法是正确的,因为如果您过去的6个月是2018年,那么如何使用购买年份和购买月份从2019年到2018年。

[您可以将时间戳转换为long,然后在window函数中使用rangebetween遍历所需的天数,对于3个月,您可以使用当天的89天(总共90天)。

from pyspark.sql import functions as F
from pyspark.sql.window import Window
days= lambda i: i * 86400
w=Window().partitionBy("Customer_ID").orderBy("sec").rangeBetween(-days(89),0)
df.withColumn("sec", F.to_timestamp(F.concat("Purchase_Year","Purchase_Month"),"yyyyM").cast("long"))\
  .withColumn("L3", F.sum("Purchases").over(w)).orderBy("Customer_ID","Purchase_Month").drop("sec").show()

+-----------+-------------+--------------+---------+---+
|Customer_ID|Purchase_Year|Purchase_Month|Purchases| L3|
+-----------+-------------+--------------+---------+---+
|          1|         2019|             1|        4|  4|
|          1|         2019|             2|        6| 10|
|          1|         2019|             3|        4| 14|
|          1|         2019|             4|        2| 12|
|          2|         2019|             1|        2|  2|
|          2|         2019|             5|        3|  3|
|          3|         2019|             1|        9|  9|
+-----------+-------------+--------------+---------+---+
© www.soinside.com 2019 - 2024. All rights reserved.