在spark上按分区训练ml模型。这样,每个分区的数据框会有一个训练好的模型。

问题描述 投票:0回答:1

如何在spark中用scala做每个分区的并行模型训练?这里给出的解决方案是在Pyspark中。我在寻找scala中的解决方案。如何在Spark中用foreachPartition高效地每个分区建立一个ML模型?

apache-spark apache-spark-ml
1个回答
0
投票
  1. 使用分区col获取不同分区
  2. 创建一个有100个线程的线程池
  3. 为每个线程创建未来对象并运行

示例代码如下

   // Get an ExecutorService 
    val threadPoolExecutorService = getExecutionContext("name", 100)
// check https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/ml/param/shared/HasParallelism.scala#L50

   val uniquePartitionValues: List[String] = ...//getDistingPartitionsUsingPartitionCol
    // Asynchronous invocation to training. The result will be collected from the futures.
    val uniquePartitionValuesFutures = uniquePartitionValues.map(partitionValue => {
      Future[Double] {
        try {
            // get dataframe where partitionCol=partitionValue
            val partitionDF = mainDF.where(s"partitionCol=$partitionValue")
          // do preprocessing and training using any algo with an input partitionDF and return accuracy
        } catch {
          ....
      }(threadPoolExecutorService)
    })

    // Wait for metrics to be calculated
    val foldMetrics = uniquePartitionValuesFutures.map(Await.result(_, Duration.Inf))
    println(s"output::${foldMetrics.mkString("  ###  ")}")
© www.soinside.com 2019 - 2024. All rights reserved.