如何从Scala中的DataFrame在Spark中创建分布式稀疏矩阵

问题描述 投票:3回答:2

问题

[请帮助寻找从DataFrame中的(用户,要素,值)记录创建分布矩阵的方法,在要素框中将要素及其值存储在列中。

数据摘录在下面,但是有大量的用户和功能,并且没有为用户测试所有功能。因此,许多要素值都是空的,将被归为0。

例如,血液检查可以具有糖水平胆固醇水平等作为特征。如果这些水平不可接受,则将值设置为1。但是,并非所有功能都将针对用户(或患者)进行测试。

+----+-------+-----+
|user|feature|value|
+----+-------+-----+
|  14|      0|    1|
|  14|    222|    1|
|  14|    200|    1|
|  22|      0|    1|
|  22|     32|    1|
|  22|    147|    1|
|  22|    279|    1|
|  22|    330|    1|
|  22|    363|    1|
|  22|    162|    1|
|  22|    811|    1|
|  22|    290|    1|
|  22|    335|    1|
|  22|    681|    1|
|  22|    786|    1|
|  22|    789|    1|
|  22|    842|    1|
|  22|    856|    1|
|  22|    881|    1|
+----+-------+-----+

如果要素是重要的列,则有一些解释方法。

但是事实并非如此。因此,一种方法可能是透视数据框以应用这些方法。

+----+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+
|user|  0| 32|147|162|200|222|279|290|330|335|363|681|786|789|811|842|856|881|
+----+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+
|  14|  1|  0|  0|  0|  1|  1|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|
|  22|  1|  1|  1|  1|  0|  0|  1|  1|  1|  1|  1|  1|  1|  1|  1|  1|  1|  1|
+----+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+

然后使用行进行矢量转换。我想使用其中之一:

  • VectorAssembler
  • org.apache.spark.mllib.linalg.Vectors.fromML
  • org.apache.spark.mllib.linalg.distributed.MatrixEntry

但是,由于将有许多空要素值被估算为0,并且数据透视表将占用更多空间。同样,枢转分布在多个节点之间的大型数据帧也会导致大型改组。

因此,寻求建议,想法,建议。

相关

环境

火花2.4.4

scala apache-spark sparse-matrix apache-spark-mllib
2个回答
0
投票

也许您可以将每一行转换成json表示形式,例如:

{ 
  "user": 14
  "features" : [
    {
      "feature" : 0
      "value"   : 1
    },
    {
      "feature" : 222
      "value"   : 1
    }
  ]
}

但是所有取决于您以后如何使用“分布式矩阵”。


0
投票

解决方案

  1. 为每个输入行创建一个RDD [(用户,功能)]。>>
  2. groupByKey创建RDD [(user,[feature +])]。
  3. 创建一个RDD [IndexedRow],其中每个IndexedRow代表以下所有现有功能。
  4. +----+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+
    |user|  0| 32|147|162|200|222|279|290|330|335|363|681|786|789|811|842|856|881|
    +----+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+
    |  14|  1|  0|  0|  0|  1|  1|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|
    
  1. 将RDD [IndexedRow]转换为IndexedRowMatrix。
  2. 对于产品操作,将RowIndexedMatrix转换为支持分布式操作产品操作的BlockMatrix。

将每个原始记录转换为IndexedRow
import org.apache.spark.mllib.linalg._
import org.apache.spark.mllib.linalg.distributed._
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.Row

def toIndexedRow(userToFeaturesMap:(Int, Iterable[Int]), maxFeatureId: Int): IndexedRow = {
    userToFeaturesMap match {
        case (userId, featureIDs) => {
            val featureCountKV = featureIDs.map(i => (i, 1.0)).toSeq
            new IndexedRow (
                userId,
                Vectors.sparse(maxFeatureId + 1, featureCountKV)
            )
        }
    }
}

val userToFeatureCounters= featureData.rdd
    .map(rowPF => (rowPF.getInt(0), rowPF.getInt(1)))  // Out from ROW[(userId, featureId)]
    .groupByKey()                                      // (userId, Iterable(featureId))
    .map(
        userToFeatureIDsMap => toIndexedRow(userToFeatureIDsMap, maxFeatureId)
    )                                                 // IndexedRow(userId, Vector((featureId, 1)))

创建的IndexedRowMatrix
val userFeatureIndexedMatrix = new IndexedRowMatrix(userToFeatureCounters)

通过BlockMatrix进行Trasponsed IndexedRowMatrix,因为IndexedRowMatrix不支持转置
val userFeatureBlockMatrixTransposed = userFeatureBlockMatrix
    .transpose

BlockMatrix为IndexedRowMatrix的已创建产品在右侧需要Local DenseMatrix。
val featuresTogetherIndexedMatrix = userFeatureBlockMatrix
    .multiply(userFeatureBlockMatrixTransposed)
    .toIndexedRowMatrix

© www.soinside.com 2019 - 2024. All rights reserved.