Spark RowMatrix columnSimilarities保留原始索引

问题描述 投票:1回答:1

我有以下Scala Spark DataFrame dfString, Array[Double]):注意id是String类型(base64 hash)

id, values
"a", [0.5, 0.6]
"b", [0.1, 0.2]
...

数据集非常大(45k),我想使用org.apache.spark.mllib.linalg.distributed.RowMatrix执行成对的余弦相似性。这有效,但我无法识别成对的相似性,因为索引已变为整数(输出列i和j)。如何使用IndexedRowMatrix保留原始索引?

val rows = df.select("values")
            .rdd
            .map(_.getAs[org.apache.spark.ml.linalg.Vector](0))
            .map(org.apache.spark.mllib.linalg.Vectors.fromML)

val mat = new RowMatrix(rows)

val simsEstimate = mat.columnSimilarities()

理想情况下,最终结果应如下所示:

id_x, id_y, similarity
"a", "b", 0.9
"b", "c", 0.8
...
scala apache-spark dataframe vector apache-spark-mllib
1个回答
0
投票

columnSimilarities()计算RowMatrix列之间的相似性,而不是行之间的相似性,因此在这种情况下,你所拥有的“ids”是无意义的,而索引是每个特征向量中的索引。

此外,这些方法是针对长,窄和数据而设计的,因此一种显而易见的方法 - 用id编码StringIndexer,创建IndedxedRowMatrix,转置,计算相似性,然后返回(使用IndexToString)根本不会这样做。

你最好的选择是采取crossJoin

df.as("a").crossJoin(df.as("b")).where($"a.id" <= $"b.id").select(
  $"a.id" as "id_x", $"b.id" as "id_y", cosine_similarity($"a.values", $b.values")
)

哪里

val cosine_similarity = udf((xs: Array[Double], ys: Array[Double]) => ???)

是你必须自己实现的东西。

或者,您可以爆炸数据:

import org.apache.spark.sql.functions.posexplode

val long = ds.select($"id", posexplode($"values")).toDF("item", "feature", "value")

然后使用Spark Scala - How to group dataframe rows and apply complex function to the groups?中显示的方法来计算相似度。

© www.soinside.com 2019 - 2024. All rights reserved.