DataFrame 使用 scala 在 Spark 中微风 DenseMatrix

问题描述 投票:0回答:3

我正在尝试使用 scala 将数据帧转换为微风密集矩阵。我找不到任何内置函数来执行此操作,所以这就是我正在做的事情。

import scala.util.Random
import breeze.linalg.DenseMatrix

val featuresDF = (1 to 10)
    .map(_ => (
      Random.nextDouble,Random.nextDouble,Random.nextDouble))
    .toDF("F1", "F2", "F3")

var FeatureArray: Array[Array[Double]] = Array.empty
val features = featuresDF.columns

for(i <- features.indices){
    FeatureArray = FeatureArray :+ featuresDF.select(features(i)).collect.map(_(0).toString).map(_.toDouble)
}

val desnseMat = DenseMatrix(FeatureArray: _*).t

这确实工作正常,我得到了我想要的。但是,这会在我的环境中导致 OOM 异常。有没有更好的方法来进行这种转换。我的最终目标是使用稠密矩阵计算特征的特征值和特征向量。

import breeze.stats.covmat
import breeze.linalg.eig

val covariance = covmat(desnseMat)
val eigen = eig(covariance)

因此,如果有一种直接的方法可以从数据帧中获取特征值和特征向量,那就更好了。 Spark ml 中的 PCA 必须使用 features 列进行此计算。有没有办法通过PCA获取特征值?

scala apache-spark eigenvalue covariance-matrix
3个回答
1
投票

首先,尝试增加你的内存。

其次,尝试使用 Spark 中的 DenseMatrix 函数之一。 这两个功能在我的计算机上使用相同数量的 RAM。

我用了 1,34 秒来解析 DataFrame 中的 201238 行,其中 1 列每列包含多个 Double 值:

import org.apache.spark.mllib.linalg.DenseMatrix
import org.apache.spark.ml.linalg.DenseVector
import org.apache.spark.sql.DataFrame

def getDenseMatrixFromDF(featuresDF:DataFrame):DenseMatrix = {
    val featuresTrain = featuresDF.columns
    val rows = featuresDF.count().toInt

    val newFeatureArray:Array[Double] = featuresTrain
       .indices
       .flatMap(i => featuresDF
       .select(featuresTrain(i))
       .collect())
       .map(r => r.toSeq.toArray).toArray.flatten.flatMap(_.asInstanceOf[org.apache.spark.ml.linalg.DenseVector].values)

    val newCols = newFeatureArray.length / rows
    val denseMat:DenseMatrix = new DenseMatrix(rows, newCols, newFeatureArray, isTransposed=false)
    denseMat
}

如果我想从 DataFrame 中获取一个 DenseVector,其中一列仅包含一个 Double 值,那么对于相同数量的数据,我需要 0.8 秒:

import org.apache.spark.mllib.linalg.DenseVector
import org.apache.spark.ml.linalg.DenseVector
import org.apache.spark.sql.DataFrame

def getDenseVectorFromDF(featuresDF:DataFrame):DenseVector = {
    val featuresTrain = featuresDF.columns
    val cols = featuresDF.columns.length

    cols match {
      case i if i>1 => throw new IllegalArgumentException
      case _ => {
        def addArray(acc:Array[Array[Double]],cur:Array[Double]):Array[Array[Double]] = {
          acc :+ cur
        }

        val newFeatureArray:Array[Double] = featuresTrain
          .indices
          .flatMap(i => featuresDF
          .select(featuresTrain(i))
          .collect())
          .map(r => r.toSeq.toArray.map(e => e.asInstanceOf[Double])).toArray.flatten

        val denseVec:DenseVector = new DenseVector(newFeatureArray)
        denseVec
   }
}

要计算特征值/特征向量,只需检查此链接此API链接

计算协方差矩阵 chek 此链接此 API 链接


1
投票
def getDenseMatrixFromDF(featuresDF:DataFrame):BDM[Double] = {
    val featuresTrain = featuresDF.columns
    val cols = featuresTrain.length
    val rows = featuresDF.count().toInt
    val denseMat: BDM[Double] = BDM.tabulate(rows,cols)((i, j)=>{
        featuresDF.collect().apply(i).getAs[Double](j)
        })
    denseMat
  }

0
投票

根据 @Catalina Chiru 的回答,我修改了它以使用哪些 2D 数据集,因为他们的第一个方法给了我错误。

import org.apache.spark.ml.linalg.DenseMatrix

  def getDenseMatrixFromDF(featuresDF: DataFrame): DenseMatrix = {
    val columnNames = featuresDF.columns
    val rows = featuresDF.count().toInt

    val newFeatureArray = columnNames
      .indices
      .flatMap(i => featuresDF
        .select(columnNames(i))
        .collect()).map(r => r.getDouble(0)).toArray

    val newCols = newFeatureArray.length / rows
    val denseMat: DenseMatrix = new DenseMatrix(rows, newCols, newFeatureArray, isTransposed = false)
    denseMat
  }
© www.soinside.com 2019 - 2024. All rights reserved.