我正在检查2个组合的分区数。一模一样 Spark数据框,我注意到Scala和Pyhton API的结果是不一样的。
在Python中,联合的分区数是2个数据框的分区数之和,这是预期的行为。
在Python中,分区数是2个数据框的分区数之和,这是预期的行为。
from pyspark.sql.types import IntegerType
df1 = spark.createDataFrame(range(100000), IntegerType()).repartition(10)
print("df1 partitions: %d" %df1.rdd.getNumPartitions())
df2 = spark.createDataFrame(range(100000), IntegerType()).repartition(10)
print("df2 partitions: %d" %df2.rdd.getNumPartitions())
df3 = df1.union(df2)
print("df3 partitions: %d" %df3.rdd.getNumPartitions())
结果。
df1 partitions: 10
df2 partitions: 10
df3 partitions: 20
然而,在Scala中,联合的分区数并没有改变。
Scala
val df1 = (1 to 100000).toDF.repartition(10)
println(s"df1 partitions: ${df1.rdd.getNumPartitions}")
val df2 = (1 to 100000 by 1).toDF.repartition(10)
println(s"df2 partitions: ${df2.rdd.getNumPartitions}")
val df3 = df1.union(df2)
println(s"df3 partitions: ${df3.rdd.getNumPartitions}")
结果。
df1 partitions: 10
df2 partitions: 10
df3 partitions: 10
只有当两个数据框的构建方式完全相同时,才会出现这种情况。
当不是的时候。
val df1 = (1 to 100000).toDF.repartition(10)
println(s"df1 partitions: ${df1.rdd.getNumPartitions}")
val df2 = (1 to 100000 by 2).toDF.repartition(10)
println(s"df2 partitions: ${df2.rdd.getNumPartitions}")
val df3 = df1.union(df2)
println(s"df3 partitions: ${df3.rdd.getNumPartitions}")
我得到了预期的结果(和)。
df1 partitions: 10
df2 partitions: 10
df3 partitions: 20
我的理解是,在某些情况下,Scala API Spark能够优化联合。这是真的吗?这意味着联合的执行计划在Scala和Python API之间可能是不同的?
我问这个问题是因为我注意到Scala的联合比Python的联合更有性能,尤其是在多个联合的情况下。
线索是通过Scala引擎的解释。
Union
:- Exchange RoundRobinPartitioning(10), [id=#757]
: +- LocalTableScan [value#154]
+- ReusedExchange [value#159], Exchange RoundRobinPartitioning(10), [id=#757]
ReusedExchange是一种优化形式。Catalyst看到它们是相同的。
如果你有一个有10000个条目,一个有10001个条目,那么就会得到20个分区。Spark很聪明。
Spark中union的定义 - scala
def union(other: Dataset[T]): Dataset[T] = withSetOperator {
// This breaks caching, but it's usually ok because it addresses a very specific use case:
// using union to union many files or partitions.
CombineUnions(Union(logicalPlan, other.logicalPlan))
}
联合在pyspark中的定义
def union(self, other):
# Return a new :class:`DataFrame` containing union of rows in this and #another
#:class:`DataFrame`.
#This is equivalent to `UNION ALL` in SQL. To do a SQL-style set union
#(that does deduplication of elements), use this function followed by #:func:`distinct`.
#Also as standard in SQL, this function resolves columns by position (not #by name).
return DataFrame(self._jdf.union(other._jdf), self.sql_ctx)
参考这里的代码来了解两者的区别 https:/github.comapachesparkblobmastersqlcatalystsrcmainscalaorgapachesparksqlcatalystoptimizerOptimizer.scala。
https:/github.comapachesparkblobmastersqlcoresrcmainscalaorgapachesparksqlDataset.scala。
https:/github.comapachesparkblobmasterpythonpysparksqldataframe.py。
如果我发现了什么有趣的东西,会继续更新
意见1---- 物理计划有scala和python的区别
union physical plan pyspark
:- Exchange RoundRobinPartitioning(10), [id=#1318]
: +- *(1) Scan ExistingRDD[value#148]
+- Exchange RoundRobinPartitioning(10), [id=#1320]
+- *(2) Scan ExistingRDD[value#154]
== Physical Plan scala ==
Union
:- Exchange RoundRobinPartitioning(10), [id=#1012]
: +- LocalTableScan [value#122]
+- ReusedExchange [value#131], Exchange RoundRobinPartitioning(10), [id=#1012]
scala Range (1 to 10 by 2) == Physical Plan ==
val df2 = (1 to 10 by 2).toDF.repartition(10)
Union
:- Exchange RoundRobinPartitioning(10), [id=#1644]
: +- LocalTableScan [value#184]
+- Exchange RoundRobinPartitioning(10), [id=#1646]
+- LocalTableScan [value#193]
意见2 -- 我相信是df1和df2的显式重新分区导致了联合df3的分区数量的变化。如果你不对你的输入进行显式的分区 Dataframes
你最终会得到一个联合的df,其分区数相当于df1和df2的总和。 我试着在相同的数据上进行以下排列,得到的结果是在
案例1
from pyspark.sql.types import IntegerType
df1 = spark.createDataFrame(range(100000), IntegerType())
print("df1 partitions: %d" %df1.rdd.getNumPartitions())
print("df1 partitioner: %s" %df1.rdd.partitioner)
df2 = spark.createDataFrame(range(100000), IntegerType())
print("df2 partitions: %d" %df2.rdd.getNumPartitions())
print("df2 partitioner: %s" %df2.rdd.partitioner)
df3 = df1.union(df2)
print("df3 partitions: %d" %df3.rdd.getNumPartitions())
print("df3 partitioner: %s" %df3.rdd.partitioner)
***************
df1 partitions: 8
df1 partitioner: None
df2 partitions: 8
df2 partitioner: None
df3 partitions: 16
df3 partitioner: None
案例2
val df1 = (1 to 100000).toDF
println(s"df1 partitions: ${df1.rdd.getNumPartitions}")
println(s"df1 partitioner: ${df1.rdd.partitioner}")
val df2 = (1 to 100000).toDF
println(s"df2 partitions: ${df2.rdd.getNumPartitions}")
println(s"df2 partitioner: ${df2.rdd.partitioner}")
df1.union(df2).explain()
val df3 = df1.union(df2)
println(s"df3 partitions: ${df3.rdd.getNumPartitions}")
println(s"df3 partitioner: ${df3.rdd.partitioner}")
***************
df1 partitions: 8
df1 partitioner: None
df2 partitions: 8
df2 partitioner: None
df3 partitions: 16
df3 partitioner: None
案例3
val df1 = (1 to 100000).toDF
println(s"df1 partitions: ${df1.rdd.getNumPartitions}")
println(s"df1 partitioner: ${df1.rdd.partitioner}")
val df2 = (1 to 100000 by 2).toDF
println(s"df2 partitions: ${df2.rdd.getNumPartitions}")
println(s"df2 partitioner: ${df2.rdd.partitioner}")
val df3 = df1.union(df2)
println(s"df3 partitions: ${df3.rdd.getNumPartitions}")
println(s"df3 partitioner: ${df3.rdd.partitioner}")
*****OP****
df1 partitions: 8
df1 partitioner: None
df2 partitions: 8
df2 partitioner: None
df3 partitions: 16
df3 partitioner: None