Pyspark-在数据框中用0填充空白月份

问题描述 投票:1回答:1

免责声明:我是pyspark的新手

我将数据框分组在idmonth_year上,以获取用户与您的产品互动的总次数。现在的问题是几个月没有活动,现在正在显示我的最终df。

这是如何完成的:

app_sessions_per_month = app_sessions.where("session_start_date_pt > '2019-05-25'").groupby('id','month_year').agg(F.sum('action').alias('count'))

+--------------------+----------+----------------------+
|             core_id|month_year|month_sum_detailaction|
+--------------------+----------+----------------------+
|aa01bb6f-2dd8-43e...|    7_2019|                     0|
|aa01bb6f-2dd8-43e...|    9_2019|                     0|
|aa01bb6f-2dd8-43e...|   10_2019|                     0|
+--------------------+----------+----------------------+

日期实际上是从05_201905_2020

原始DF:

id month_year count
1. 02_2020.   1
1. 03_2020.   4
1. 05_2020.   2
1. 06_2020.   7
1. 07_2020.   2

所需的DF:

id month_year count
1. 01_2020.   0
1. 02_2020.   1
1. 03_2020.   4
1  04_2020.   0
1. 05_2020.   2
1. 06_2020.   7
1. 07_2020.   2

现在此示例仅显示1 id,但我的实际数据集具有数百万个ID

任何帮助将不胜感激。

dataframe pyspark
1个回答
0
投票

尝试使用Spark2.4+。我们隔离next row is more than 1 month difference所在的行,然后在该行上使用sequence生成missing month/months,然后生成add 0 to count row using array_repeat,然后生成< [explode together using arrays_zip以获得所需的输出。

df.show() #sample dataframe #+---+----------+-----+ #| id|month_year|count| #+---+----------+-----+ #| 1| 01_2020| 1| #| 1| 02_2020| 4| #| 1| 04_2020| 2| #| 1| 05_2020| 7| #| 1| 06_2020| 2| #+---+----------+-----+ from pyspark.sql import functions as F from pyspark.sql.window import Window w=Window().partitionBy("id").orderBy("month_year") df.withColumn("month_year", F.to_date("month_year","MM_yyyy"))\ .withColumn("lead", F.lead("month_year").over(w))\ .withColumn("month_year", F.when((F.col("lead").isNotNull())&(F.months_between("lead","month_year")>1),\ F.expr("""sequence(month_year,lead - interval 1 month,interval 1 month)"""))\ .otherwise(F.array("month_year")))\ .withColumn("count1", F.when(F.size("month_year")>1, F.expr("""array_repeat(0,size(month_year)-1)"""))\ .otherwise(F.array()))\ .withColumn("count", F.flatten(F.array(F.array("count"),"count1")))\ .withColumn("zip", F.explode(F.arrays_zip("month_year","count")))\ .select("id","zip.*").withColumn("month_year", F.date_format("month_year","MM_yyyy")).show() #+---+----------+-----+ #|id |month_year|count| #+---+----------+-----+ #|1 |01_2020 |1 | #|1 |02_2020 |4 | #|1 |03_2020 |0 | #|1 |04_2020 |2 | #|1 |05_2020 |7 | #|1 |06_2020 |2 | #+---+----------+-----+
© www.soinside.com 2019 - 2024. All rights reserved.