在 Spark RDD 中为另一个 RDD 中的每条记录查找最接近的记录

问题描述 投票:0回答:1

我有两个巨大的 RDD,对于其中一个记录中的每条记录,我需要找到另一个具有相同

key
的物理上最接近的(纬度/经度)点。但是......在每个 RDD 中,都有数百百万条记录具有相同的
key
,所以我无法对
key
进行完全连接。任何距离计算都需要限制在其他数据集中的局部点。

这是我今天所做的一个简化版本(为了清楚起见,并且因为 stackoverflow 中没有内存限制)……但我想看看是否有更好的方法。内存更少,更快,更好?

让我们从如下所示的数据开始:

case class Meta(key: Any, lat: Double, lon: Double, meta: Any)

case class Data(key: Any, lat: Double, lon: Double, data: Any, meta: Option[Any] = None) { 
  def ingestMeta(m: Option[Meta]) = this.copy(meta = m) 
}

val rddMeta: RDD[Meta] = ???
val rddData: RDD[Data] = ???

我尝试本地化距离计算的方式很难解释。对于元(较小的数据集),我将每个记录复制 9 倍,并将它们捕捉到元记录的截断纬度/经度处的截断(整数)纬度/经度,并且每个截断纬度/经度上也有 +/- 1 度。

所以...如果真正的元点是红点,并且每个框都是一个以国际纬度/经度为中心的 1 度 x 1 度框,我会在所有 9 个蓝点处复制元。

请参阅对 Meta 类的修改,其中添加了

threeDegreeBox
方法:

case class Meta(key: Any, lat: Double, lon: Double, meta: Any) {
  def threeDegreeBox: List[((Any, Int, Int), Meta)] = {
      for { 
        lt <- List(lat.toInt - 1, lat.toInt, lat.toInt + 1)
        ln <- List(lon.toInt - 1, lon.toInt, lon.toInt + 1) 
      } yield ((key, lt, ln), this)
    }

  // distance calc
  def haversine(otherLat: Double, otherLon: Double): Double = ???
}

那么这有什么帮助呢?

首先,我将具有相同键的所有元记录的联接减少为仅数据记录 1 到 2 度内具有相同键的元记录。事实上,我不想要任何哪怕是 1 度的距离。 60公里已经足够了,所以回来的候选人还是太多了,但总比错过一个好。

因此,我在这里收集带有给定键的每个元记录在整数化位置。

// RDD[((key, truncLat, truncLon), Iterable[Meta])]
val rddMetaBoxed: RDD[((Any, Int, Int), Iterable[Meta])] =
  rddMeta
    .flatMap(_.threeDegreeBox)
    .groupByKey

然后我也截断数据中的纬度/经度以加入。由于我已将 Meta 9x 复制到周围位置,因此此连接应将 150 公里左右的范围内的所有内容拉入其中。然后,我只需迭代所有元记录并对每个记录执行距离计算(在实际的、未截断的纬度/经度上)以找到最接近的并摄取元。

// perform the same truncation on rddData to join
val result: RDD[Meta] = 
  rddData
    .keyBy( d => (d.key, d.lat.toInt, d.lon.toInt) )
    .leftOuterJoin( rddMetaBoxed ) // maybe inner join and dump non-matches
    .map { case ((key, lt, ln), (data, optionIterableMeta)) => 
      // use the distance calc to find the closest Meta in the Iterable
      val closest: Option[Meta] = ???
      data.ingestMeta(closest) 
    }

它可以工作,但是实际运行它很麻烦,我需要添加一堆黑客的东西来避免内存问题。必须有一种更简单/更智能的方法来做到这一点。有人有什么想法/建议吗?

谢谢。

scala apache-spark geolocation geospatial rdd
1个回答
0
投票

您可以使用 Geohash 来实现此目的。

它与您的

(Any, Int, Int)
键具有相同的属性,只不过它是一个字符串,具有明确定义的属性。

因此,取一个

RDD[Meta]
并将其转换为
RDD[(Meta, Seq[String])]
flatMap 转换为
RDD[(String, Meta)]
groupBy 将字符串转换为
RDD[(String, Seq[Meta])
并使用连接和过滤器来查找最近的点。

它显然与您想出的非常相似,除了更简单的密钥

© www.soinside.com 2019 - 2024. All rights reserved.