我想比较为pyspark和scala spark读取文件所需的时间。在Scala中,我正在做以下事情:
import org.apache.spark.sql.SparkSession
object Playground extends App {
def getDfFromFile(path: String, spark: SparkSession) = {
spark
.read
.format("csv")
.option("inferSchema", true)
.option("header", true)
.load(path)
}
val master = "local"
val spark = SparkSession.builder()
.master(master)
.getOrCreate()
spark.sparkContext.setLogLevel("WARN")
val N = 1
val avgTime = (0 until N)
.map(_ => {
val start = System.nanoTime()
getDfFromFile("resources/bigdatafile.csv", spark)
val end = System.nanoTime()
end - start
}).sum / Math.pow(10, 9)
println(avgTime)
}
使用N = 1
这需要大约2.7秒。如果我将它增加到10,则需要大约3.7秒。使用N = 100
,需要9.7秒。
显然,N的时间增长不是线性的。为什么不呢?我能做些什么才能使它成为线性的,以便我可以有效地比较Spark和PySpark中第一次读取文件所需的次数?
编辑:
如果我更改代码以在map函数内创建sparkSession:
val N = 100
val avgTime = (0 until N)
.map(_ => {
val spark = SparkSession.builder()
.master(master)
.getOrCreate()
spark.sparkContext.setLogLevel("WARN")
val start = System.nanoTime()
getDfFromFile("resources/bigdatafile.csv", spark)
val end = System.nanoTime()
spark.close()
end - start
}).sum / Math.pow(10, 9)
它对结果几乎没有影响。
Spark执行策略是懒惰的,在你的情况下,它意味着当你对它应用一个动作操作(收集,接收......)时,spark会从文件中加载数据,但是你的代码只是计算开始和结束时间并聚合它所以不需要从文件加载数据!
对于时间评估,您应该对文件数据应用一些逻辑(包括操作操作),例如,您可以应用字数统计算法,甚至可以只打印任何行。