使用Python的减少()来连接多个PySpark DataFrames

问题描述 投票:3回答:2

有谁知道为什么连接多个PySpark DataFrames比使用functools.reduce()循环迭代刚刚加入同一DataFrames在使用Python3的for会导致更糟糕的表现?具体而言,这给出了一个巨大的减速,接着存储器外的一个错误:

def join_dataframes(list_of_join_columns, left_df, right_df):
    return left_df.join(right_df, on=list_of_join_columns)

joined_df = functools.reduce(
    functools.partial(join_dataframes, list_of_join_columns), list_of_dataframes,
)

而这其中并不:

joined_df = list_of_dataframes[0]
joined_df.cache()
for right_df in list_of_dataframes[1:]:
    joined_df = joined_df.join(right_df, on=list_of_join_columns)

任何想法将不胜感激。谢谢!

python python-3.x pyspark spark-dataframe pyspark-sql
2个回答
-1
投票

一个原因是,一个降低或折叠通常是功能上纯:每个累积操作的结果不被写入存储器的同一部件,而是新的存储块。

原则上,垃圾收集器会释放每个累积后的前一个块,但如果没有,你会贮液器的每个更新版本分配内存。


1
投票

只要你使用CPython的(不同的实现就可以了,但实际上不应该在这种特殊情况下表现出不同的显著行为)。如果你看一看reduce implementation你会看到它只是一个for循环以最小的异常处理。

其核心是完全等同于你使用循环

for element in it:
    value = function(value, element)

并没有证据支持的任何特殊行为的权利要求。

另外简单的测试与星火的实际限制帧数连接(连接are among the most expensive operations in Spark

dfs = [
    spark.range(10000).selectExpr(
        "rand({}) AS id".format(i), "id AS value",  "{} AS loop ".format(i)
    )
    for i in range(200)
]

显示定时直接用于环之间没有显著差异

def f(dfs):
    df1 = dfs[0]
    for df2 in dfs[1:]:
        df1 = df1.join(df2, ["id"])
    return df1

%timeit -n3 f(dfs)                 
## 6.25 s ± 257 ms per loop (mean ± std. dev. of 7 runs, 3 loops each)

reduce调用

from functools import reduce

def g(dfs):
    return reduce(lambda x, y: x.join(y, ["id"]), dfs) 

%timeit -n3 g(dfs)
### 6.47 s ± 455 ms per loop (mean ± std. dev. of 7 runs, 3 loops each)

同样整体JVM的行为模式是循环之间的可比性

For loop CPU and Memory Usage - VisualVM

reduce

reduce CPU and Memory Usage - VisualVM

最后都产生相同的执行计划

g(dfs)._jdf.queryExecution().optimizedPlan().equals( 
    f(dfs)._jdf.queryExecution().optimizedPlan()
)
## True

这表明,当计划进行评估和奥姆斯有可能出现没有任何区别。

换句话说,你的相关性并不意味着因果关系,并观察性能问题不太可能与你使用DataFrames结合的方法。

© www.soinside.com 2019 - 2024. All rights reserved.