我想将线性列表转换为数据帧。即给出以下清单,
a = ["a1", "a2", "a3", b1", "b2", "b3", "c1", "c2", "c3"]
预期的结果是,
+--------------------+
| col1 | col2 | col3 |
+--------------------+
| a1 | a2 | a3 |
| b1 | b2 | b3 |
| c1 | c2 | c3 |
+--------------------+
我尝试了以下但是出了错误。
from pyspark.sql.types import *
a = ["a1", "a2", "a3", "b1", "b2", "b3", "c1", "c2", "c3"]
rdd = sc.parallelize(a)
schema = StructType([
StructField("a", StringType(), True),
StructField("b", StringType(), True),
StructField("c", StringType(), True)
])
df = sqlContext.createDataFrame(rdd, schema)
df.show()
最后一个show()语句出现错误“作业因阶段失败而中止”。请有人告诉我解决方案吗?谢谢。
基于你的comment,我认为你从rdd
而不是列表开始。
我进一步假设您正在根据rdd
的索引确定顺序。如果这些假设是正确的,您可以使用zipWithIndex()
为每条记录添加行号。
然后将行数除以3(使用整数除法)以对每3个连续记录进行分组。接下来使用groupByKey()
将具有相同key
的记录聚合成一个元组。
最后,放下钥匙并致电toDF()
rdd.zipWithIndex()\
.map(lambda row: (row[1]//3, row[0]))\
.groupByKey()\
.map(lambda row: tuple(row[1]))\
.toDF(["a", "b", "c"])\
.show()
#+---+---+---+
#| a| b| c|
#+---+---+---+
#| a1| a2| a3|
#| c1| c2| c3|
#| b1| b2| b3|
#+---+---+---+
这是一种有希望符合您标准的方法
# First get a 1 column DF
df = sql.createDataFrame(sc.parallelize(a).map(lambda x: [x]), schema=['col'])
# split each value into a number and letter e.g. 'a1' --> ['a','1'])
df = df.withColumn('letter', f.split('col', '').getItem(0))
df = df.withColumn('number', f.split('col', '').getItem(1))
# Now pivot to get what you want (dropping extraneous columns and ordering
# to get exact output
output = (df.groupBy('letter')
.pivot('number')
.agg(f.first('col'))
.select([f.col(column).alias('col%s'%(column)) for column in ['1','2','3']])
.orderBy('col1')
.drop('letter'))