在Apache Spark中尝试将Dataframe写入CSV时,行为不一致。

问题描述 投票:0回答:1

我试图用Dataframes和spark-csv将我用Spark的MLlib训练的一个决策树分类器的最优超参数输出到csv文件。这是我的一段代码。

// Split the data into training and test sets (10% held out for testing)
val Array(trainingData, testData) = assembledData.randomSplit(Array(0.9, 0.1))

// Define cross validation with a hyperparameter grid
val crossval = new CrossValidator()
    .setEstimator(classifier)
    .setEstimatorParamMaps(paramGrid)
    .setEvaluator(new BinaryClassificationEvaluator)
    .setNumFolds(10)

// Train model
val model = crossval.fit(trainingData)

// Find best hyperparameter combination and create an RDD 
val bestModel = model.bestModel
val hyperparamList = new ListBuffer[(String, String)]()
bestModel.extractParamMap().toSeq.foreach(pair => {
    val hyperparam: Tuple2[String,String] = (pair.param.name,pair.value.toString)
    hyperparamList += hyperparam
})
val hyperparameters = sqlContext.sparkContext.parallelize(hyperparamList.toSeq)

// Print the best hyperparameters 
println(bestModel.extractParamMap().toSeq.foreach(pair => {
    println(s"${pair.param.parent} ${pair.param.name}")
    println(pair.value)
}))

// Define csv path to output results
var csvPath: String  = "/root/results/decision-tree"
val hyperparametersPath: String = csvPath+"/hyperparameters"
val hyperparametersFile: File = new File(hyperparametersPath)
val results = (hyperparameters, hyperparametersPath, hyperparametersFile)

// Convert RDD to Dataframe and write it as csv 
val dfToSave = spark.createDataFrame(results._1.map(x => Row(x._1, x._2)))
dfToSave.write.format("csv").mode("overwrite").save(results._2)

// Stop spark session
spark.stop()

在完成Spark作业后,我可以在路径中看到part-00*... 和_SUCCESS文件,就像预期的那样。然而,虽然在这种情况下总共有13个超参数(通过在屏幕上打印它们来确认)。cat-从csv文件中可以看出,并不是每个超参数都被写入了csv。

user@master:~$ cat /root/results/decision-tree/hyperparameters/part*.csv
checkpointInterval,10
featuresCol,features
maxDepth,5
minInstancesPerNode,1

而且,那些被写入的超参数在每次执行时都会发生变化。这是在一个基于HDFS的Spark集群上执行的,有1个主站和3个工站,它们的硬件完全相同。这会不会是一个竞赛条件?如果是,如何解决?

先谢谢你。

csv apache-spark apache-spark-sql apache-spark-mllib spark-csv
1个回答
0
投票

我想我想通了。我希望 dfTosave.write.format("csv")save(path) 把所有的东西都写到主节点上,但是由于任务是分配给所有的工作者的,所以每个工作者都会把它的超参数部分保存到文件系统中的本地CSV中。因为在我的例子中,主节点也是一个worker,我可以看到它的部分超参数。这种 "不一致的行为"(即在每次执行中看到不同的部分)是由Spark用于在worker之间分配分区的任何算法造成的。

我的解决方案是使用类似于以下的方法从所有worker中收集CSV scprsync 来构建完整的结果。

© www.soinside.com 2019 - 2024. All rights reserved.