我收到下面的 CSV 文件(没有标题)-
D,neel,32,1,pin1,state1,male
D,sani,31,2,pin1,state1,pin2,state2,female
D,raja,33,3,pin1,state1,pin2,state2,pin3,state3,male
我想使用 pyspark dataframe 创建下面的 CSV 文件 -
D,neel,32,1,pin1,state1,male
D,sani,31,2,pin1,state1,female
D,sani,31,2,pin2,state2,female
D,raja,33,3,pin1,state1,male
D,raja,33,3,pin2,state2,male
D,raja,33,3,pin3,state3,male
注意: 输入文件中第 4 列的数字决定记录中有多少个引脚和状态列。 喜欢
as neel has 1 in the 4th column, thus neel has 1 set of pin and state (pin1,state1)
as sani has 2 in 4th column, thus sani has 2 sets of pin and state (pin1,state1,pin2,state2
as raja has 3 in 4th column, thus raja has 3 sets of pin and state (pin1,state1,pin2,state2,pin3,state3)
我无法达到我想要的输出..
没有 CSV 数据源选项来处理此类动态情况。
一种方法是将其读取为 CSV 并指定具有最大已知列数的架构。
然后您可以使用
基于第四列应用转换case
when col4 = 1 then col7
When col4 = 2 then col8
...
End as gender
您可以将此逻辑应用于每个动态列,如果 col4 中的数字较大,也可以通过脚本生成它。
另一种方法是将文件作为字符串 rdd 读取并使用自定义代码解析内容。
由于文本文件是非结构化的,我们可以使用自定义函数并将其应用于 RDD 来解析文本文件。
def explode(row):
a, b, c, d, *e, f = row.split(',')
return [[a, b, c, d, *e[i: i+2], f]
for i in range(0, len(e), 2)]
df = spark.sparkContext.textFile('data.csv').flatMap(explode).toDF()
df.show()
+---+----+---+---+----+------+------+
| _1| _2| _3| _4| _5| _6| _7|
+---+----+---+---+----+------+------+
| D|neel| 32| 1|pin1|state1| male|
| D|sani| 31| 2|pin1|state1|female|
| D|sani| 31| 2|pin2|state2|female|
| D|raja| 33| 3|pin1|state1| male|
| D|raja| 33| 3|pin2|state2| male|
| D|raja| 33| 3|pin3|state3| male|
+---+----+---+---+----+------+------+