如何在不使用for循环的情况下从pyspark中的列表创建数据框?

问题描述 投票:-4回答:2

我有如下列表:

rrr=[[(1,(3,1)),(2, (3,2)),(3, (3, 2)),(1,(4,1)),(2, (4,2))]]

df_input = []

接下来我定义了如下标题:

df_header=['sid', 'tid', 'srank']

使用for循环将数据附加到空列表中:

for i in rrr:
     for j in i:
            df_input.append((j[0], j[1][0], j[1][1]))
df_input

Output : [(1, 3, 1), (2, 3, 2), (3, 3, 2)]

创建数据框如下:

   df = spark.createDataFrame(df_input, df_header)
    df.show()

+---+---+------+
| sid|tid|srank|
+---+---+------+
|  1|  3|     1|
|  2|  3|     2|
|  3|  3|     2|
+---+---+------+

现在我的问题是如何在不使用任何外部for循环的情况下创建数据框架(如上所述)。输入列表包含超过1个Lakh记录。

apache-spark pyspark apache-spark-sql spark-dataframe pyspark-sql
2个回答
1
投票

当您意识到您的初始列表是嵌套列表时。即实际列表作为外部列表的唯一元素,那么您将通过仅考虑其第一个(也是唯一的)元素来看到解决方案很容易:

spark.version
#  u'2.1.1'

from pyspark.sql import Row

# your exact data:
rrr=[[(1,(3,1)),(2, (3,2)),(3, (3, 2)),(1,(4,1)),(2, (4,2))]]
df_header=['sid', 'tid', 'srank']

df = sc.parallelize(rrr[0]).map(lambda x: Row(x[0], x[1][0],x[1][1])).toDF(schema=df_header)
df.show()
# +---+---+-----+ 
# |sid|tid|srank|
# +---+---+-----+
# |  1|  3|    1|
# |  2|  3|    2|
# |  3|  3|    2|
# |  1|  4|    1| 
# |  2|  4|    2|
# +---+---+-----+

0
投票

解决方案一:引入toDF()转换(但修改了输入)

from pyspark.sql import Row    
ar=[[1,(3,1)],[2, (3,2)],[3, (3,2)]]
sc.parallelize(ar).map(lambda x: Row(sid=x[0], tid=x[1][0],srank=x[1][1])).toDF().show()

+---+-----+---+
|sid|srank|tid|
+---+-----+---+
|  1|    1|  3|
|  2|    2|  3|
|  3|    2|  3|
+---+-----+---+

解决方案2:使用请求的输入矩阵使用列表理解,numpy flatten和reshape

import numpy as np 
x=[[(1,(3,1)),(2, (3,2)),(3, (3, 2))]]
ar=[[(j[0],j[1][0],j[1][1]) for j in i] for i in x]
flat=np.array(ar).flatten()
flat=flat.reshape(len(flat)/3, 3)
sc.parallelize(flat).map(lambda x: Row(sid=int(x[0]),tid=int(x[1]),srank=int(x[2]))).toDF().show()

+---+-----+---+
|sid|srank|tid|
+---+-----+---+
|  1|    1|  3|
|  2|    2|  3|
|  3|    2|  3|
+---+-----+---+

#works also with N,M matrix
number_columns=3
x=[[(1,(3,1)),(2, (3,2)),(3, (3, 2))],[(5,(6,7)),(8, (9,10)),(11, (12, 13))]]
ar=[[(j[0],j[1][0],j[1][1]) for j in i] for i in x]
flat=np.array(ar).flatten()
flat=flat.reshape(int(len(flat)/number_columns), number_columns)
sc.parallelize(flat).map(lambda x: Row(sid=int(x[0]),tid=int(x[1]),srank=int(x[2]))).toDF().show()
+---+-----+---+
|sid|srank|tid|
+---+-----+---+
|  1|    1|  3|
|  2|    2|  3|
|  3|    2|  3|
|  5|    7|  6|
|  8|   10|  9|
| 11|   13| 12|
+---+-----+---+
© www.soinside.com 2019 - 2024. All rights reserved.