对于单个DataFrame,Spark中的局部敏感散列

问题描述 投票:1回答:2

我已经阅读了有关局部性哈希的Spark部分,但仍然不了解其中的一部分:

https://spark.apache.org/docs/latest/ml-features.html#locality-sensitive-hashing

还有两个数据框的桶装随机投影示例。我有一个简单的点空间数据集,例如:

enter image description here

(当然,以后我将有数百万个点),DataFrame看起来像:

  X        Y
id                  
1   11.6133  48.1075
2   11.6142  48.1066
3   11.6108  48.1061
4   11.6207  48.1192
5   11.6221  48.1223
6   11.5969  48.1276
7   11.5995  48.1258
8   11.6127  48.1066
9   11.6430  48.1275
10  11.6368  48.1278
11  11.5930  48.1156

我的问题是:如何将彼此靠近的点放在相同的组中,所以我的原始DataFrame会有带有此哈希/组的附加列?

最好,Marcin

apache-spark pyspark apache-spark-mllib
2个回答
1
投票

BucketedRandomProjectionLSH完全可以满足您的需求。每个点的结果哈希可以是一个组值。唯一的问题是选择合适的半径,这将设置每个铲斗的尺寸。用.setBucketLength(0.02)设置半径。另一个小问题是将散列从向量提取到列。我使用这种方法:Spark Scala: How to convert Dataframe[vector] to DataFrame[f1:Double, ..., fn: Double)]

数据示例

import org.apache.spark.ml.feature.BucketedRandomProjectionLSH
import org.apache.spark.ml.linalg.Vectors
import org.apache.spark.ml.linalg.Vector

val dfA = spark.createDataFrame(Seq(
  (1, Vectors.dense(11.6133, 48.1075)),
  (2, Vectors.dense(11.6142, 48.1066)),
  (3, Vectors.dense(11.6108, 48.1061)),
  (4, Vectors.dense(11.6207, 48.1192)),
  (5, Vectors.dense(11.6221, 48.1223)),
  (6, Vectors.dense(11.5969, 48.1276)),
  (7, Vectors.dense(11.5995, 48.1258)),
  (8, Vectors.dense(11.6127, 48.1066)),
  (9, Vectors.dense(11.6430, 48.1275)),
  (10, Vectors.dense(11.6368, 48.1278)),
  (11, Vectors.dense(11.5930, 48.1156))
  )).toDF("id", "coord")

val brp = new BucketedRandomProjectionLSH()
  .setBucketLength(0.02)
  .setNumHashTables(1)
  .setInputCol("coord")
  .setOutputCol("hashes")
val model = brp.fit(dfA)

val res = model.transform(dfA)

val vecToSeq = udf((v: Vector) => v.toArray).asNondeterministic

res.select ($"id", vecToSeq($"hashes"(0))(0) as "bucket").show

输出给出半径为0.02的2组:

  +---+------+
  | id|bucket|
  +---+------+
  |  1|2473.0|
  |  2|2473.0|
  |  3|2473.0|
  |  4|2474.0|
  |  5|2474.0|
  |  6|2474.0|
  |  7|2474.0|
  |  8|2473.0|
  |  9|2474.0|
  | 10|2474.0|
  | 11|2473.0|

1
投票

这里是执行LSH的一些scala代码。基本上,lsh需要一个可以用VectorAssembler构造的组合向量。

// contructing the dataframe
val data= """1   11.6133  48.1075
2   11.6142  48.1066
3   11.6108  48.1061
4   11.6207  48.1192
5   11.6221  48.1223
6   11.5969  48.1276
7   11.5995  48.1258
8   11.6127  48.1066
9   11.6430  48.1275
10  11.6368  48.1278
11  11.5930  48.1156"""
val df = data
    .split("\\s*\\n\\s*")
    .map( _.split("\\s+") match {
        case Array(a, b, c) => (a.toInt,b.toDouble,c.toDouble)
    })
    .toSeq
    .toDF("id", "X", "Y")

val assembler = new VectorAssembler()
    .setInputCols(Array("X", "Y"))
    .setOutputCol("v")
val df2 = assembler.transform(df)
val lsh = new BucketedRandomProjectionLSH()
    .setInputCol("v")
    .setBucketLength(1e-3) // change that according to your use case
    .setOutputCol("lsh")
val result = lsh.fit(df2).transform(df2).orderBy("lsh")

// the lsh is in an array of vectors. To extract the double, we can use
// getItem for the array and a UDF for the vector.
val extract = udf((vector : org.apache.spark.ml.linalg.Vector) => vector(0))
result.withColumn("lsh", extract(col("lsh").getItem(0))).show(false)
© www.soinside.com 2019 - 2024. All rights reserved.