我有一个大的 pyspark 数据框,但使用如下所示的小数据框来测试性能。我知道将 pyspark 列转换为列表的三种方法,但没有一种方法与 spark 作业的运行速度一样快。从 pyspark 数据框列创建 python 列表的最佳和最快方法是什么?
Dataframe:-
from pyspark.sql import Row
df = spark.createDataFrame([
Row(a=1, b=4., c='GFG1'),
Row(a=2, b=8., c='GFG2'),
Row(a=4, b=5., c='GFG3')
])
method 1:-
list1=[]
for row_iterator in df.collect():
list1.append(row_iterator['b'])
output:- [4.0, 8.0, 5.0]
time taken:- 0.20 seconds
method2:-
list2 = df.rdd.map(lambda x: x.b).collect()
list2
output:- [4.0, 8.0, 5.0]
time taken:- 0.30 seconds
method3:-
list3 = df.select(df.b).rdd.flatMap(lambda x: x).collect()
list3
output:- [4.0, 8.0, 5.0]
time taken:- 0.34 seconds
虽然遍历行的速度更快,但我正在寻找一个比这更快的结果的更好的解决方案,因为我的 pyspark 数据框很大(10k 行数据)。有没有最快的方法将 pyspark 列转换为 python 列表?
有一个函数叫做
collect_list
。它将只收集1列而不是像正常收集那样收集整个数据框。当我在我的环境中测试时,它比你的 3 种方法快得多。您也可以在您的环境中进行测试。
from pyspark.sql import Row
from pyspark.sql import SparkSession
from pyspark.sql.functions import collect_list
import time
spark = SparkSession.builder.getOrCreate()
df = spark.createDataFrame([
Row(a=1, b=4., c='GFG1'),
Row(a=2, b=8., c='GFG2'),
Row(a=4, b=5., c='GFG3')
])
#method 1:-
start_time = time.time()
list1=[]
for row_iterator in df.collect():
list1.append(row_iterator['b'])
end_time = time.time()
print(end_time - start_time)
#output:- [4.0, 8.0, 5.0]
#time taken:- 7.4 seconds
#method2:-
start_time = time.time()
list2 = df.rdd.map(lambda x: x.b).collect()
end_time = time.time()
print(end_time - start_time)
#output:- [4.0, 8.0, 5.0]
#time taken:- 6.2 seconds
#method3:-
start_time = time.time()
list3 = df.select(df.b).rdd.flatMap(lambda x: x).collect()
end_time = time.time()
print(end_time - start_time)
#output:- [4.0, 8.0, 5.0]
#time taken:- 6.1 seconds
#my method (using collect_list):
start_time = time.time()
list4 = df.select(collect_list("b")).first()[0]
end_time = time.time()
print(end_time - start_time)
#output:- [4.0, 8.0, 5.0]
#time taken:- 3.7 seconds
仍然,不建议使用任何一种收集方法,因为您不会从 Spark 的并行处理功能中受益。
试试这个
from pyspark.sql import Row
df = spark.createDataFrame([
Row(a=1, b=4., c='GFG1'),
Row(a=2, b=8., c='GFG2'),
Row(a=4, b=5., c='GFG3')
])
方法一:
df.select("a").rdd.map(lambda row: row[0]).collect()
方法二:
df.rdd.map(lambda row: row.asDict()["a"]).collect()
这两种方法都需要大约 0.43 秒 我在具有单节点配置的 Databricks 社区版上尝试了这个,这可能会根据您用于数据处理的集群而改变。