下面的代码,因此有关性能的问题 - 当然是大规模的想象:
import org.apache.spark.sql.types.StructType
val df = sc.parallelize(Seq(
("r1", 1, 1),
("r2", 6, 4),
("r3", 4, 1),
("r4", 1, 2)
)).toDF("ID", "a", "b")
val ones = df.schema.map(c => c.name).drop(1).map(x => when(col(x) === 1, 1).otherwise(0)).reduce(_ + _)
// or
def ones = df.schema.map(c => c.name).drop(1).map(x => when(col(x) === 1, 1).otherwise(0)).reduce(_ + _)
df.withColumn("ones", ones).explain
这里有两个使用def和val的物理计划 - 它们是相同的:
== Physical Plan == **def**
*(1) Project [_1#760 AS ID#764, _2#761 AS a#765, _3#762 AS b#766, (CASE WHEN (_2#761 = 1) THEN 1 ELSE 0 END + CASE WHEN (_3#762 = 1) THEN 1 ELSE 0 END) AS ones#770]
+- *(1) SerializeFromObject [staticinvoke(class
org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, scala.Tuple3, true])._1, true, false) AS _1#760, assertnotnull(input[0, scala.Tuple3, true])._2 AS _2#761, assertnotnull(input[0, scala.Tuple3, true])._3 AS _3#762]
+- Scan[obj#759]
== Physical Plan == **val**
*(1) Project [_1#780 AS ID#784, _2#781 AS a#785, _3#782 AS b#786, (CASE WHEN (_2#781 = 1) THEN 1 ELSE 0 END + CASE WHEN (_3#782 = 1) THEN 1 ELSE 0 END) AS ones#790]
+- *(1) SerializeFromObject [staticinvoke(class
org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, scala.Tuple3, true])._1, true, false) AS _1#780, assertnotnull(input[0, scala.Tuple3, true])._2 AS _2#781, assertnotnull(input[0, scala.Tuple3, true])._3 AS _3#782]
+- Scan[obj#779]
所以,有讨论:
val vs def表现。
然后:
对于-1er我问这个因为以下是非常明确的,但是val的代码比下面的代码更多,而且下面没有迭代:
var x = 2 // using var as I need to change it to 3 later
val sq = x*x // evaluates right now
x = 3 // no effect! sq is already evaluated
println(sq)
这里有两个核心概念,Spark DAG创建和评估,以及Scala的val
和def
定义,这些是正交的
我认为.explains没有区别
您没有看到任何区别,因为从Spark的角度来看,查询是相同的。如果您将图表存储在val
中,或者每次使用def
创建图表,则与分析器无关。
来自其他地方:val在定义时评估,def-在被调用时。
这是Scala语义。 val
是一个不可变的引用,它在声明站点进行一次评估。 def
代表方法定义,如果你在其中分配一个新的DataFrame
,它会在你每次调用时创建一个。例如:
def ones =
df
.schema
.map(c => c.name)
.drop(1)
.map(x => when(col(x) === 1, 1).otherwise(0))
.reduce(_ + _)
val firstcall = ones
val secondCall = ones
上面的代码将在DF上构建两个独立的DAG。
我假设在这里使用val或def没有区别,因为它基本上在一个循环中并且有一个减少。它是否正确?
我不确定你在谈论哪个循环,但请看上面的答案,区分两者。
每个数据帧行都会执行df.schema.map(c => c.name)。drop(1)吗?当然没有必要。 Catalyst会优化这个吗?
不,drop(1)
将在整个数据框架中发生,这实际上只会使它仅丢弃第一行。
如果上述情况是正确的,每次执行语句都要执行,那么我们怎样才能使这段代码只出现一次?我们应该制作val的val = df.schema.map(c => c.name)。drop(1)
每个数据帧只发生一次(在您的示例中我们只有一个)。
ones
表达式不会针对每个数据帧行进行评估,它将被评估一次。 def
得到每次通话评估。例如,如果使用该dataframe
表达式有3个ones
s,那么ones
表达式将被评估3次。 val
之间的区别在于表达式只会被评估一次。
基本上,ones
表达式创建了org.apache.spark.sql.Column
的org.apache.spark.sql.Column = (CASE WHEN (a = 1) THEN 1 ELSE 0 END + CASE WHEN (b = 1) THEN 1 ELSE 0 END)
实例。如果表达式是def
,则每次调用时都会实例化一个新的org.apache.spark.sql.Column
。如果表达式是val
,则反复使用相同的实例。