AWS粘合pyspark-将源表中的一行转换为目标中的多行

问题描述 投票:1回答:1

我有以下要求

enter image description here

如何使用pyspark爆炸功能实现这一点

pyspark pyspark-sql explode
1个回答
0
投票

将与spark2.4 +一起使用

#sampledataframe
df.show()

+----+--------+----------+-----------+-----------+-----------+-----------+-----------+
|code|part_num|start_date|daily_qty_1|daily_qty_2|daily_qty_3|daily_qty_4|daily_qty_5|
+----+--------+----------+-----------+-----------+-----------+-----------+-----------+
|   1|      12|  20200102|          1|          2|          3|          4|          5|
+----+--------+----------+-----------+-----------+-----------+-----------+-----------+

首先,您可以concat with sep所有daily_qty列,然后在Sep上split将它们获得array,然后得到size数组(F.size)的,以便您可以基于daily_qty列的total number设置start_date的sequence。然后,您可以将arrays_zipexplode结合使用并选择您的列。

from pyspark.sql import functions as F
df.withColumn("daily_qty", F.split(F.concat_ws(',',*(x for x in df.columns if x.startswith('daily_qty'))),','))\
.withColumn("size", F.size("daily_qty"))\
.withColumn("start_date",F.col("start_date").cast("long"))\
.withColumn("date", F.expr("""sequence(start_date,start_date+bigint(size)-1,1)""")).drop("size")\
.withColumn("temp", F.explode(F.arrays_zip("date", "daily_qty")))\
.select("code","part_num","temp.date","temp.daily_qty")\
.withColumn("date", F.to_date(F.col("date").cast("string"),"yyyyMMdd")).show()

+----+--------+----------+---------+
|code|part_num|      date|daily_qty|
+----+--------+----------+---------+
|   1|      12|2020-01-02|        1|
|   1|      12|2020-01-03|        2|
|   1|      12|2020-01-04|        3|
|   1|      12|2020-01-05|        4|
|   1|      12|2020-01-06|        5|
+----+--------+----------+---------+

(欢迎来到SO)

© www.soinside.com 2019 - 2024. All rights reserved.