将 pyspark 列转换为 python 列表的最快方法是什么?

问题描述 投票:0回答:2

我有一个大的 pyspark 数据框,但使用如下所示的小数据框来测试性能。我知道将 pyspark 列转换为列表的三种方法,但没有一种方法与 spark 作业的运行速度一样快。从 pyspark 数据框列创建 python 列表的最佳和最快方法是什么?

Dataframe:-

from pyspark.sql import Row

df = spark.createDataFrame([
    Row(a=1, b=4., c='GFG1'),
    Row(a=2, b=8., c='GFG2'),
    Row(a=4, b=5., c='GFG3')
]) 

method 1:-
list1=[]
for row_iterator in df.collect():
    list1.append(row_iterator['b'])
output:- [4.0, 8.0, 5.0]
time taken:- 0.20 seconds

method2:-
list2 = df.rdd.map(lambda x: x.b).collect()
list2
output:- [4.0, 8.0, 5.0]
time taken:- 0.30 seconds

method3:-
list3 = df.select(df.b).rdd.flatMap(lambda x: x).collect()
list3
output:- [4.0, 8.0, 5.0]
time taken:- 0.34 seconds

虽然遍历行的速度更快,但我正在寻找一个比这更快的结果的更好的解决方案,因为我的 pyspark 数据框很大(10k 行数据)。有没有最快的方法将 pyspark 列转换为 python 列表?

python list apache-spark pyspark rdd
2个回答
0
投票

有一个函数叫做

collect_list
。它将只收集1列而不是像正常收集那样收集整个数据框。当我在我的环境中测试时,它比你的 3 种方法快得多。您也可以在您的环境中进行测试。

from pyspark.sql import Row
from pyspark.sql import SparkSession
from pyspark.sql.functions import collect_list
import time
spark = SparkSession.builder.getOrCreate()

df = spark.createDataFrame([
    Row(a=1, b=4., c='GFG1'),
    Row(a=2, b=8., c='GFG2'),
    Row(a=4, b=5., c='GFG3')
]) 

#method 1:-
start_time = time.time()
list1=[]
for row_iterator in df.collect():
    list1.append(row_iterator['b'])
end_time = time.time()
print(end_time - start_time)
#output:- [4.0, 8.0, 5.0]
#time taken:- 7.4 seconds

#method2:-
start_time = time.time()
list2 = df.rdd.map(lambda x: x.b).collect()
end_time = time.time()
print(end_time - start_time)
#output:- [4.0, 8.0, 5.0]
#time taken:- 6.2 seconds

#method3:-
start_time = time.time()
list3 = df.select(df.b).rdd.flatMap(lambda x: x).collect()
end_time = time.time()
print(end_time - start_time)
#output:- [4.0, 8.0, 5.0]
#time taken:- 6.1 seconds

#my method (using collect_list):
start_time = time.time()
list4 = df.select(collect_list("b")).first()[0]
end_time = time.time()
print(end_time - start_time)
#output:- [4.0, 8.0, 5.0]
#time taken:- 3.7 seconds

仍然,不建议使用任何一种收集方法,因为您不会从 Spark 的并行处理功能中受益。


0
投票

试试这个

from pyspark.sql import Row

df = spark.createDataFrame([
    Row(a=1, b=4., c='GFG1'),
    Row(a=2, b=8., c='GFG2'),
    Row(a=4, b=5., c='GFG3')
]) 

方法一:

df.select("a").rdd.map(lambda row: row[0]).collect()

方法二:

df.rdd.map(lambda row: row.asDict()["a"]).collect()

这两种方法都需要大约 0.43 秒 我在具有单节点配置的 Databricks 社区版上尝试了这个,这可能会根据您用于数据处理的集群而改变。

© www.soinside.com 2019 - 2024. All rights reserved.