我有以下Scala Spark DataFrame df
(String, Array[Double]
):注意id
是String类型(base64 hash)
id, values
"a", [0.5, 0.6]
"b", [0.1, 0.2]
...
数据集非常大(45k),我想使用org.apache.spark.mllib.linalg.distributed.RowMatrix
执行成对的余弦相似性。这有效,但我无法识别成对的相似性,因为索引已变为整数(输出列i和j)。如何使用IndexedRowMatrix
保留原始索引?
val rows = df.select("values")
.rdd
.map(_.getAs[org.apache.spark.ml.linalg.Vector](0))
.map(org.apache.spark.mllib.linalg.Vectors.fromML)
val mat = new RowMatrix(rows)
val simsEstimate = mat.columnSimilarities()
理想情况下,最终结果应如下所示:
id_x, id_y, similarity
"a", "b", 0.9
"b", "c", 0.8
...
columnSimilarities()
计算RowMatrix
列之间的相似性,而不是行之间的相似性,因此在这种情况下,你所拥有的“ids”是无意义的,而索引是每个特征向量中的索引。
此外,这些方法是针对长,窄和数据而设计的,因此一种显而易见的方法 - 用id
编码StringIndexer
,创建IndedxedRowMatrix
,转置,计算相似性,然后返回(使用IndexToString
)根本不会这样做。
你最好的选择是采取crossJoin
df.as("a").crossJoin(df.as("b")).where($"a.id" <= $"b.id").select(
$"a.id" as "id_x", $"b.id" as "id_y", cosine_similarity($"a.values", $b.values")
)
哪里
val cosine_similarity = udf((xs: Array[Double], ys: Array[Double]) => ???)
是你必须自己实现的东西。
或者,您可以爆炸数据:
import org.apache.spark.sql.functions.posexplode
val long = ds.select($"id", posexplode($"values")).toDF("item", "feature", "value")
然后使用Spark Scala - How to group dataframe rows and apply complex function to the groups?中显示的方法来计算相似度。