我有一个数据集,可向我提供人们在电视上观看过的内容,观看的时长以及观看的网络。我们有以下几列:
TV ID - string
show_nw - Array which has TV show concatenated with Network
All_nws - Array which has network concatenated with Duration
All_shows - Array which has show concatenated with duration
样本数据集:
TV_ID : a1001 , a1002, a1003
show_nw: ["TheFactsofLife#1001","Bewitched#1001","Survivor#1000","SEALTeam#1000","WhenWhalesWalkedJourneysinDeepTime#1002","PaidProgramming#1006"], ["AllEliteWrestlingDynamite#1003","TheAdjustmentBureau#1004","Charmed#1003"], ["TMJ4Now#1005"]
all_nws : ["1000#7062","1001#602","1002#40","1006#47"], ["1003#7328","1004#46"], ["1005#1543"]
all_shows : ["Bewitched#563","Survivor#6988","SEALTeam#74","WhenWhalesWalkedJourneysinDeepTime#40","PaidProgramming#47","TheFactsofLife#39"], ["Charmed#462","AllEliteWrestlingDynamite#6866","TheAdjustmentBureau#46"], ["TMJ4Now#1543"]
现在当我将数据集从数组分解回时
test_df = df.select("tv_id", "all_shows", "all_nws").withColumn("all_shows", explode("all_shows")).withColumn("all_nws", explode("all_nws")).withColumn("show",split(col("all_shows"),"#").getItem(0)).withColumn("network",split(col("all_nws"),"#").getItem(0))
我的输出看起来像:
tv_id all_shows all_nws show network
a1001 Bewitched#563 1000#7062 Bewitched 1000
a1001 Bewitched#563 1001#602 Bewitched 1001
a1001 Bewitched#563 1002#40 Bewitched 1002
a1001 Bewitched#563 1006#47 Bewitched 1006
a1001 Survivor#6988 1000#7062 Survivor 1000
a1001 Survivor#6988 1001#602 Survivor 1001
a1001 Survivor#6988 1002#40 Survivor 1002
a1001 Survivor#6988 1006#47 Survivor 1006
因此基本上在父数据集中,仅在网络1000上监视了Bewitched和Survivor,但是当爆炸时,我们发现它们都与TV_ID拥有的所有网络相关联。在这种情况下,爆炸后如何获得正确的数据集?
我认为您需要在爆炸前做zip arrays
。
来自Spark-2.4
使用arrays_zip
功能。
对于Spark < 2.4
需要使用udf。
Example:(Spark-2.4)
#sample dataframe.
df.show()
#+-----+--------------------+--------------------+--------------------+
#|tv_id| show_nw| all_nws| all_shows|
#+-----+--------------------+--------------------+--------------------+
#|a1001|[TheFactsofLife#1...|[1000#7062, 1001#...|[Bewitched#563, S...|
#|a1002|[AllEliteWrestlin...|[1003#7328, 1004#46]|[Charmed#462, All...|
#|a1003| [TMJ4Now#1005]| [1005#1543]| [TMJ4Now#1543]|
#+-----+--------------------+--------------------+--------------------+
df.select("tv_id", "all_shows", "all_nws").\
withColumn("all_shows", explode(arrays_zip("all_shows","all_nws"))).\
select("tv_id","all_shows.*").\
withColumn("show",split(col("all_shows"),"#").getItem(0)).\
withColumn("network",split(col("all_nws"),"#").getItem(0)).\
show()
#+-----+--------------------+---------+--------------------+-------+
#|tv_id| all_shows| all_nws| show|network|
#+-----+--------------------+---------+--------------------+-------+
#|a1001| Bewitched#563|1000#7062| Bewitched| 1000|
#|a1001| Survivor#6988| 1001#602| Survivor| 1001|
#|a1001| SEALTeam#74| 1002#40| SEALTeam| 1002|
#|a1001|WhenWhalesWalkedJ...| 1006#47|WhenWhalesWalkedJ...| 1006|
#|a1001| PaidProgramming#47| null| PaidProgramming| null|
#|a1001| TheFactsofLife#39| null| TheFactsofLife| null|
#|a1002| Charmed#462|1003#7328| Charmed| 1003|
#|a1002|AllEliteWrestling...| 1004#46|AllEliteWrestling...| 1004|
#|a1002|TheAdjustmentBure...| null| TheAdjustmentBureau| null|
#|a1003| TMJ4Now#1543|1005#1543| TMJ4Now| 1005|
#+-----+--------------------+---------+--------------------+-------+