如何在spark中用scala做每个分区的并行模型训练?这里给出的解决方案是在Pyspark中。我在寻找scala中的解决方案。如何在Spark中用foreachPartition高效地每个分区建立一个ML模型?
示例代码如下
// Get an ExecutorService
val threadPoolExecutorService = getExecutionContext("name", 100)
// check https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/ml/param/shared/HasParallelism.scala#L50
val uniquePartitionValues: List[String] = ...//getDistingPartitionsUsingPartitionCol
// Asynchronous invocation to training. The result will be collected from the futures.
val uniquePartitionValuesFutures = uniquePartitionValues.map(partitionValue => {
Future[Double] {
try {
// get dataframe where partitionCol=partitionValue
val partitionDF = mainDF.where(s"partitionCol=$partitionValue")
// do preprocessing and training using any algo with an input partitionDF and return accuracy
} catch {
....
}(threadPoolExecutorService)
})
// Wait for metrics to be calculated
val foldMetrics = uniquePartitionValuesFutures.map(Await.result(_, Duration.Inf))
println(s"output::${foldMetrics.mkString(" ### ")}")