用Scala和Python API联合Spark数据框时,分区数不同。

问题描述 投票:1回答:1

我正在检查2个组合的分区数。一模一样 Spark数据框,我注意到Scala和Pyhton API的结果是不一样的。

在Python中,联合的分区数是2个数据框的分区数之和,这是预期的行为。

在Python中,分区数是2个数据框的分区数之和,这是预期的行为。

from pyspark.sql.types import IntegerType

df1 = spark.createDataFrame(range(100000), IntegerType()).repartition(10)
print("df1 partitions: %d" %df1.rdd.getNumPartitions())

df2 = spark.createDataFrame(range(100000), IntegerType()).repartition(10)
print("df2 partitions: %d" %df2.rdd.getNumPartitions())

df3 = df1.union(df2)
print("df3 partitions: %d" %df3.rdd.getNumPartitions())

结果。

df1 partitions: 10
df2 partitions: 10
df3 partitions: 20

然而,在Scala中,联合的分区数并没有改变。

Scala

val df1 = (1 to 100000).toDF.repartition(10)
println(s"df1 partitions: ${df1.rdd.getNumPartitions}")

val df2 = (1 to 100000 by 1).toDF.repartition(10)
println(s"df2 partitions: ${df2.rdd.getNumPartitions}")

val df3 = df1.union(df2)
println(s"df3 partitions: ${df3.rdd.getNumPartitions}")

结果。

df1 partitions: 10
df2 partitions: 10
df3 partitions: 10

只有当两个数据框的构建方式完全相同时,才会出现这种情况。

当不是的时候。

val df1 = (1 to 100000).toDF.repartition(10)
println(s"df1 partitions: ${df1.rdd.getNumPartitions}")

val df2 = (1 to 100000 by 2).toDF.repartition(10)
println(s"df2 partitions: ${df2.rdd.getNumPartitions}")

val df3 = df1.union(df2)
println(s"df3 partitions: ${df3.rdd.getNumPartitions}")

我得到了预期的结果(和)。

df1 partitions: 10
df2 partitions: 10
df3 partitions: 20

我的理解是,在某些情况下,Scala API Spark能够优化联合。这是真的吗?这意味着联合的执行计划在Scala和Python API之间可能是不同的?

我问这个问题是因为我注意到Scala的联合比Python的联合更有性能,尤其是在多个联合的情况下。

scala apache-spark pyspark union partition
1个回答
1
投票

线索是通过Scala引擎的解释。

Union
:- Exchange RoundRobinPartitioning(10), [id=#757]
:  +- LocalTableScan [value#154]
+- ReusedExchange [value#159], Exchange RoundRobinPartitioning(10), [id=#757]

ReusedExchange是一种优化形式。Catalyst看到它们是相同的。

如果你有一个有10000个条目,一个有10001个条目,那么就会得到20个分区。Spark很聪明。


3
投票

Spark中union的定义 - scala

def union(other: Dataset[T]): Dataset[T] = withSetOperator {
    // This breaks caching, but it's usually ok because it addresses a very specific use case:
    // using union to union many files or partitions.
    CombineUnions(Union(logicalPlan, other.logicalPlan))
  }

联合在pyspark中的定义

def union(self, other):
        # Return a new :class:`DataFrame` containing union of rows in this and #another
        #:class:`DataFrame`.
        #This is equivalent to `UNION ALL` in SQL. To do a SQL-style set union
        #(that does deduplication of elements), use this function followed by #:func:`distinct`.
        #Also as standard in SQL, this function resolves columns by position (not #by name).

        return DataFrame(self._jdf.union(other._jdf), self.sql_ctx)

参考这里的代码来了解两者的区别 https:/github.comapachesparkblobmastersqlcatalystsrcmainscalaorgapachesparksqlcatalystoptimizerOptimizer.scala。

https:/github.comapachesparkblobmastersqlcoresrcmainscalaorgapachesparksqlDataset.scala。

https:/github.comapachesparkblobmasterpythonpysparksqldataframe.py。


1
投票

如果我发现了什么有趣的东西,会继续更新

意见1---- 物理计划有scala和python的区别

union physical plan pyspark 
:- Exchange RoundRobinPartitioning(10), [id=#1318]
:  +- *(1) Scan ExistingRDD[value#148]
+- Exchange RoundRobinPartitioning(10), [id=#1320]
   +- *(2) Scan ExistingRDD[value#154]



== Physical Plan scala  ==
Union
:- Exchange RoundRobinPartitioning(10), [id=#1012]
:  +- LocalTableScan [value#122]
+- ReusedExchange [value#131], Exchange RoundRobinPartitioning(10), [id=#1012]


 scala  Range (1 to 10 by 2) == Physical Plan ==
 val df2 = (1 to 10 by 2).toDF.repartition(10)
Union
:- Exchange RoundRobinPartitioning(10), [id=#1644]
:  +- LocalTableScan [value#184]
+- Exchange RoundRobinPartitioning(10), [id=#1646]
   +- LocalTableScan [value#193]

意见2 -- 我相信是df1和df2的显式重新分区导致了联合df3的分区数量的变化。如果你不对你的输入进行显式的分区 Dataframes 你最终会得到一个联合的df,其分区数相当于df1和df2的总和。 我试着在相同的数据上进行以下排列,得到的结果是在

案例1

from pyspark.sql.types import IntegerType
  df1 = spark.createDataFrame(range(100000), IntegerType())
  print("df1 partitions: %d" %df1.rdd.getNumPartitions())
  print("df1 partitioner: %s" %df1.rdd.partitioner)
  df2 = spark.createDataFrame(range(100000), IntegerType())
  print("df2 partitions: %d" %df2.rdd.getNumPartitions())
  print("df2 partitioner: %s" %df2.rdd.partitioner)
  df3 = df1.union(df2)
  print("df3 partitions: %d" %df3.rdd.getNumPartitions())
  print("df3 partitioner: %s" %df3.rdd.partitioner)

***************

df1 partitions: 8
df1 partitioner: None
df2 partitions: 8
df2 partitioner: None

df3 partitions: 16
df3 partitioner: None

案例2

val df1 = (1 to 100000).toDF
println(s"df1 partitions: ${df1.rdd.getNumPartitions}")
println(s"df1 partitioner: ${df1.rdd.partitioner}")
val df2 = (1 to 100000).toDF

println(s"df2 partitions: ${df2.rdd.getNumPartitions}")
println(s"df2 partitioner: ${df2.rdd.partitioner}")
 df1.union(df2).explain()
val df3 = df1.union(df2)

println(s"df3 partitions: ${df3.rdd.getNumPartitions}")
println(s"df3 partitioner: ${df3.rdd.partitioner}")

***************

df1 partitions: 8
df1 partitioner: None
df2 partitions: 8
df2 partitioner: None
df3 partitions: 16
df3 partitioner: None

案例3

val df1 = (1 to 100000).toDF
println(s"df1 partitions: ${df1.rdd.getNumPartitions}")
println(s"df1 partitioner: ${df1.rdd.partitioner}")
val df2 = (1 to 100000 by 2).toDF
println(s"df2 partitions: ${df2.rdd.getNumPartitions}")
println(s"df2 partitioner: ${df2.rdd.partitioner}")
val df3 = df1.union(df2)
println(s"df3 partitions: ${df3.rdd.getNumPartitions}")
println(s"df3 partitioner: ${df3.rdd.partitioner}")

*****OP****

df1 partitions: 8
df1 partitioner: None
df2 partitions: 8
df2 partitioner: None
df3 partitions: 16
df3 partitioner: None
© www.soinside.com 2019 - 2024. All rights reserved.