处理 pyspark 中的动态列数(csv 文件)

问题描述 投票:0回答:2

我收到下面的 CSV 文件(没有标题)-

D,neel,32,1,pin1,state1,male
D,sani,31,2,pin1,state1,pin2,state2,female
D,raja,33,3,pin1,state1,pin2,state2,pin3,state3,male

我想使用 pyspark dataframe 创建下面的 CSV 文件 -

D,neel,32,1,pin1,state1,male
D,sani,31,2,pin1,state1,female
D,sani,31,2,pin2,state2,female
D,raja,33,3,pin1,state1,male
D,raja,33,3,pin2,state2,male
D,raja,33,3,pin3,state3,male

注意: 输入文件中第 4 列的数字决定记录中有多少个引脚和状态列。 喜欢

as neel has 1 in the 4th column, thus neel has 1 set of pin and state  (pin1,state1)
as sani has 2 in 4th column, thus sani has 2 sets of pin and state  (pin1,state1,pin2,state2
as raja has 3 in 4th column, thus raja has 3 sets of pin and state  (pin1,state1,pin2,state2,pin3,state3)

我无法达到我想要的输出..

python dataframe pyspark apache-spark-sql
2个回答
0
投票

没有 CSV 数据源选项来处理此类动态情况。

一种方法是将其读取为 CSV 并指定具有最大已知列数的架构。

然后您可以使用

基于第四列应用转换
case 
when col4 = 1 then col7
When col4 = 2 then col8
...
End as gender

您可以将此逻辑应用于每个动态列,如果 col4 中的数字较大,也可以通过脚本生成它。

另一种方法是将文件作为字符串 rdd 读取并使用自定义代码解析内容。


0
投票

由于文本文件是非结构化的,我们可以使用自定义函数并将其应用于 RDD 来解析文本文件。

def explode(row):
    a, b, c, d, *e, f = row.split(',')
    return [[a, b, c, d, *e[i: i+2], f] 
            for i in range(0, len(e), 2)]

df = spark.sparkContext.textFile('data.csv').flatMap(explode).toDF()

df.show()

+---+----+---+---+----+------+------+
| _1|  _2| _3| _4|  _5|    _6|    _7|
+---+----+---+---+----+------+------+
|  D|neel| 32|  1|pin1|state1|  male|
|  D|sani| 31|  2|pin1|state1|female|
|  D|sani| 31|  2|pin2|state2|female|
|  D|raja| 33|  3|pin1|state1|  male|
|  D|raja| 33|  3|pin2|state2|  male|
|  D|raja| 33|  3|pin3|state3|  male|
+---+----+---+---+----+------+------+
© www.soinside.com 2019 - 2024. All rights reserved.