如何将PySpark RDD线性列表转换为DataFrame?

问题描述 投票:-2回答:2

我想将线性列表转换为数据帧。即给出以下清单,

a = ["a1", "a2", "a3", b1", "b2", "b3", "c1", "c2", "c3"]

预期的结果是,

+--------------------+
| col1 | col2 | col3 |
+--------------------+
|  a1  |  a2  |  a3  |
|  b1  |  b2  |  b3  |
|  c1  |  c2  |  c3  |
+--------------------+

我尝试了以下但是出了错误。

from pyspark.sql.types import *

a = ["a1", "a2", "a3", "b1", "b2", "b3", "c1", "c2", "c3"]

rdd = sc.parallelize(a)

schema = StructType([
     StructField("a", StringType(), True),
     StructField("b", StringType(), True),
     StructField("c", StringType(), True)
     ])

df = sqlContext.createDataFrame(rdd, schema)

df.show()

最后一个show()语句出现错误“作业因阶段失败而中止”。请有人告诉我解决方案吗?谢谢。

apache-spark dataframe pyspark rdd
2个回答
1
投票

基于你的comment,我认为你从rdd而不是列表开始。

我进一步假设您正在根据rdd的索引确定顺序。如果这些假设是正确的,您可以使用zipWithIndex()为每条记录添加行号。

然后将行数除以3(使用整数除法)以对每3个连续记录进行分组。接下来使用groupByKey()将具有相同key的记录聚合成一个元组。

最后,放下钥匙并致电toDF()

rdd.zipWithIndex()\
    .map(lambda row: (row[1]//3, row[0]))\
    .groupByKey()\
    .map(lambda row: tuple(row[1]))\
    .toDF(["a", "b", "c"])\
    .show()
#+---+---+---+
#|  a|  b|  c|
#+---+---+---+
#| a1| a2| a3|
#| c1| c2| c3|
#| b1| b2| b3|
#+---+---+---+

0
投票

这是一种有希望符合您标准的方法

# First get a 1 column DF
df = sql.createDataFrame(sc.parallelize(a).map(lambda x: [x]), schema=['col'])
# split each value into a number and letter e.g. 'a1' --> ['a','1']) 
df = df.withColumn('letter', f.split('col', '').getItem(0))
df = df.withColumn('number', f.split('col', '').getItem(1))

# Now pivot to get what you want (dropping extraneous columns and ordering 
# to get exact output

output = (df.groupBy('letter')
          .pivot('number')
          .agg(f.first('col'))
          .select([f.col(column).alias('col%s'%(column)) for column in ['1','2','3']])
          .orderBy('col1')
          .drop('letter'))
© www.soinside.com 2019 - 2024. All rights reserved.