我已经阅读了有关局部性哈希的Spark部分,但仍然不了解其中的一部分:
https://spark.apache.org/docs/latest/ml-features.html#locality-sensitive-hashing
还有两个数据框的桶装随机投影示例。我有一个简单的点空间数据集,例如:
(当然,以后我将有数百万个点),DataFrame看起来像:
X Y
id
1 11.6133 48.1075
2 11.6142 48.1066
3 11.6108 48.1061
4 11.6207 48.1192
5 11.6221 48.1223
6 11.5969 48.1276
7 11.5995 48.1258
8 11.6127 48.1066
9 11.6430 48.1275
10 11.6368 48.1278
11 11.5930 48.1156
我的问题是:如何将彼此靠近的点放在相同的组中,所以我的原始DataFrame会有带有此哈希/组的附加列?
最好,Marcin
BucketedRandomProjectionLSH
完全可以满足您的需求。每个点的结果哈希可以是一个组值。唯一的问题是选择合适的半径,这将设置每个铲斗的尺寸。用.setBucketLength(0.02)
设置半径。另一个小问题是将散列从向量提取到列。我使用这种方法:Spark Scala: How to convert Dataframe[vector] to DataFrame[f1:Double, ..., fn: Double)]
数据示例
import org.apache.spark.ml.feature.BucketedRandomProjectionLSH
import org.apache.spark.ml.linalg.Vectors
import org.apache.spark.ml.linalg.Vector
val dfA = spark.createDataFrame(Seq(
(1, Vectors.dense(11.6133, 48.1075)),
(2, Vectors.dense(11.6142, 48.1066)),
(3, Vectors.dense(11.6108, 48.1061)),
(4, Vectors.dense(11.6207, 48.1192)),
(5, Vectors.dense(11.6221, 48.1223)),
(6, Vectors.dense(11.5969, 48.1276)),
(7, Vectors.dense(11.5995, 48.1258)),
(8, Vectors.dense(11.6127, 48.1066)),
(9, Vectors.dense(11.6430, 48.1275)),
(10, Vectors.dense(11.6368, 48.1278)),
(11, Vectors.dense(11.5930, 48.1156))
)).toDF("id", "coord")
val brp = new BucketedRandomProjectionLSH()
.setBucketLength(0.02)
.setNumHashTables(1)
.setInputCol("coord")
.setOutputCol("hashes")
val model = brp.fit(dfA)
val res = model.transform(dfA)
val vecToSeq = udf((v: Vector) => v.toArray).asNondeterministic
res.select ($"id", vecToSeq($"hashes"(0))(0) as "bucket").show
输出给出半径为0.02的2组:
+---+------+
| id|bucket|
+---+------+
| 1|2473.0|
| 2|2473.0|
| 3|2473.0|
| 4|2474.0|
| 5|2474.0|
| 6|2474.0|
| 7|2474.0|
| 8|2473.0|
| 9|2474.0|
| 10|2474.0|
| 11|2473.0|
这里是执行LSH
的一些scala代码。基本上,lsh需要一个可以用VectorAssembler
构造的组合向量。
// contructing the dataframe
val data= """1 11.6133 48.1075
2 11.6142 48.1066
3 11.6108 48.1061
4 11.6207 48.1192
5 11.6221 48.1223
6 11.5969 48.1276
7 11.5995 48.1258
8 11.6127 48.1066
9 11.6430 48.1275
10 11.6368 48.1278
11 11.5930 48.1156"""
val df = data
.split("\\s*\\n\\s*")
.map( _.split("\\s+") match {
case Array(a, b, c) => (a.toInt,b.toDouble,c.toDouble)
})
.toSeq
.toDF("id", "X", "Y")
val assembler = new VectorAssembler()
.setInputCols(Array("X", "Y"))
.setOutputCol("v")
val df2 = assembler.transform(df)
val lsh = new BucketedRandomProjectionLSH()
.setInputCol("v")
.setBucketLength(1e-3) // change that according to your use case
.setOutputCol("lsh")
val result = lsh.fit(df2).transform(df2).orderBy("lsh")
// the lsh is in an array of vectors. To extract the double, we can use
// getItem for the array and a UDF for the vector.
val extract = udf((vector : org.apache.spark.ml.linalg.Vector) => vector(0))
result.withColumn("lsh", extract(col("lsh").getItem(0))).show(false)