[请帮助寻找从DataFrame中的(用户,要素,值)记录创建分布矩阵的方法,在要素框中将要素及其值存储在列中。
数据摘录在下面,但是有大量的用户和功能,并且没有为用户测试所有功能。因此,许多要素值都是空的,将被归为0。
例如,血液检查可以具有糖水平,胆固醇水平等作为特征。如果这些水平不可接受,则将值设置为1。但是,并非所有功能都将针对用户(或患者)进行测试。
+----+-------+-----+
|user|feature|value|
+----+-------+-----+
| 14| 0| 1|
| 14| 222| 1|
| 14| 200| 1|
| 22| 0| 1|
| 22| 32| 1|
| 22| 147| 1|
| 22| 279| 1|
| 22| 330| 1|
| 22| 363| 1|
| 22| 162| 1|
| 22| 811| 1|
| 22| 290| 1|
| 22| 335| 1|
| 22| 681| 1|
| 22| 786| 1|
| 22| 789| 1|
| 22| 842| 1|
| 22| 856| 1|
| 22| 881| 1|
+----+-------+-----+
如果要素是重要的列,则有一些解释方法。
但是事实并非如此。因此,一种方法可能是透视数据框以应用这些方法。
+----+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+
|user| 0| 32|147|162|200|222|279|290|330|335|363|681|786|789|811|842|856|881|
+----+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+
| 14| 1| 0| 0| 0| 1| 1| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0|
| 22| 1| 1| 1| 1| 0| 0| 1| 1| 1| 1| 1| 1| 1| 1| 1| 1| 1| 1|
+----+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+
然后使用行进行矢量转换。我想使用其中之一:
但是,由于将有许多空要素值被估算为0,并且数据透视表将占用更多空间。同样,枢转分布在多个节点之间的大型数据帧也会导致大型改组。
因此,寻求建议,想法,建议。
火花2.4.4
也许您可以将每一行转换成json表示形式,例如:
{
"user": 14
"features" : [
{
"feature" : 0
"value" : 1
},
{
"feature" : 222
"value" : 1
}
]
}
但是所有取决于您以后如何使用“分布式矩阵”。
+----+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+
|user| 0| 32|147|162|200|222|279|290|330|335|363|681|786|789|811|842|856|881|
+----+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+
| 14| 1| 0| 0| 0| 1| 1| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0| 0|
对于产品操作,将RowIndexedMatrix转换为支持分布式操作产品操作的BlockMatrix。
import org.apache.spark.mllib.linalg._
import org.apache.spark.mllib.linalg.distributed._
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.Row
def toIndexedRow(userToFeaturesMap:(Int, Iterable[Int]), maxFeatureId: Int): IndexedRow = {
userToFeaturesMap match {
case (userId, featureIDs) => {
val featureCountKV = featureIDs.map(i => (i, 1.0)).toSeq
new IndexedRow (
userId,
Vectors.sparse(maxFeatureId + 1, featureCountKV)
)
}
}
}
val userToFeatureCounters= featureData.rdd
.map(rowPF => (rowPF.getInt(0), rowPF.getInt(1))) // Out from ROW[(userId, featureId)]
.groupByKey() // (userId, Iterable(featureId))
.map(
userToFeatureIDsMap => toIndexedRow(userToFeatureIDsMap, maxFeatureId)
) // IndexedRow(userId, Vector((featureId, 1)))
val userFeatureIndexedMatrix = new IndexedRowMatrix(userToFeatureCounters)
val userFeatureBlockMatrixTransposed = userFeatureBlockMatrix
.transpose
val featuresTogetherIndexedMatrix = userFeatureBlockMatrix
.multiply(userFeatureBlockMatrixTransposed)
.toIndexedRowMatrix