对Spark Dataframe的val vs def性能

问题描述 投票:1回答:2

下面的代码,因此有关性能的问题 - 当然是大规模的想象:

import org.apache.spark.sql.types.StructType

val df = sc.parallelize(Seq(
   ("r1", 1, 1),
   ("r2", 6, 4),
   ("r3", 4, 1),
   ("r4", 1, 2)
   )).toDF("ID", "a", "b")

val ones = df.schema.map(c => c.name).drop(1).map(x => when(col(x) === 1, 1).otherwise(0)).reduce(_ + _)

// or

def ones = df.schema.map(c => c.name).drop(1).map(x => when(col(x) === 1, 1).otherwise(0)).reduce(_ + _)

df.withColumn("ones", ones).explain

这里有两个使用def和val的物理计划 - 它们是相同的:

 == Physical Plan == **def**
 *(1) Project [_1#760 AS ID#764, _2#761 AS a#765, _3#762 AS b#766, (CASE WHEN (_2#761 = 1) THEN 1 ELSE 0 END + CASE WHEN (_3#762 = 1) THEN 1 ELSE 0 END) AS ones#770]
 +- *(1) SerializeFromObject [staticinvoke(class 
 org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, scala.Tuple3, true])._1, true, false) AS _1#760, assertnotnull(input[0, scala.Tuple3, true])._2 AS _2#761, assertnotnull(input[0, scala.Tuple3, true])._3 AS _3#762]
   +- Scan[obj#759]


 == Physical Plan == **val**
 *(1) Project [_1#780 AS ID#784, _2#781 AS a#785, _3#782 AS b#786, (CASE WHEN (_2#781 = 1) THEN 1 ELSE 0 END + CASE WHEN (_3#782 = 1) THEN 1 ELSE 0 END) AS ones#790]
 +- *(1) SerializeFromObject [staticinvoke(class 
 org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, scala.Tuple3, true])._1, true, false) AS _1#780, assertnotnull(input[0, scala.Tuple3, true])._2 AS _2#781, assertnotnull(input[0, scala.Tuple3, true])._3 AS _3#782]
    +- Scan[obj#779] 

所以,有讨论:

val vs def表现。

然后:

  • 我认为.explains没有区别。好。
  • 来自其他地方:val在定义时评估,def-在被调用时。
  • 我假设在这里使用val或def没有区别,因为它基本上在一个循环中并且有一个减少。它是否正确?
  • 每个数据帧行都会执行df.schema.map(c => c.name)。drop(1)吗?当然没有必要。 Catalyst会优化这个吗?
  • 如果上述情况是正确的,每次执行语句都要执行,那么我们怎样才能使这段代码只出现一次?我们应该制作val的val = df.schema.map(c => c.name)。drop(1)
  • val,def比Scala更多,也是Spark组件。

对于-1er我问这个因为以下是非常明确的,但是val的代码比下面的代码更多,而且下面没有迭代:

var x = 2 // using var as I need to change it to 3 later
val sq = x*x // evaluates right now
x = 3 // no effect! sq is already evaluated
println(sq)
scala apache-spark
2个回答
5
投票

这里有两个核心概念,Spark DAG创建和评估,以及Scala的valdef定义,这些是正交的

我认为.explains没有区别

您没有看到任何区别,因为从Spark的角度来看,查询是相同的。如果您将图表存储在val中,或者每次使用def创建图表,则与分析器无关。

来自其他地方:val在定义时评估,def-在被调用时。

这是Scala语义。 val是一个不可变的引用,它在声明站点进行一次评估。 def代表方法定义,如果你在其中分配一个新的DataFrame,它会在你每次调用时创建一个。例如:

def ones = 
  df
   .schema
   .map(c => c.name)
   .drop(1)
   .map(x => when(col(x) === 1, 1).otherwise(0))
   .reduce(_ + _)

val firstcall = ones
val secondCall = ones

上面的代码将在DF上构建两个独立的DAG。

我假设在这里使用val或def没有区别,因为它基本上在一个循环中并且有一个减少。它是否正确?

我不确定你在谈论哪个循环,但请看上面的答案,区分两者。

每个数据帧行都会执行df.schema.map(c => c.name)。drop(1)吗?当然没有必要。 Catalyst会优化这个吗?

不,drop(1)将在整个数据框架中发生,这实际上只会使它仅丢弃第一行。

如果上述情况是正确的,每次执行语句都要执行,那么我们怎样才能使这段代码只出现一次?我们应该制作val的val = df.schema.map(c => c.name)。drop(1)

每个数据帧只发生一次(在您的示例中我们只有一个)。


1
投票

ones表达式不会针对每个数据帧行进行评估,它将被评估一次。 def得到每次通话评估。例如,如果使用该dataframe表达式有3个oness,那么ones表达式将被评估3次。 val之间的区别在于表达式只会被评估一次。

基本上,ones表达式创建了org.apache.spark.sql.Columnorg.apache.spark.sql.Column = (CASE WHEN (a = 1) THEN 1 ELSE 0 END + CASE WHEN (b = 1) THEN 1 ELSE 0 END)实例。如果表达式是def,则每次调用时都会实例化一个新的org.apache.spark.sql.Column。如果表达式是val,则反复使用相同的实例。

© www.soinside.com 2019 - 2024. All rights reserved.