我有两个巨大的 RDD,对于其中一个记录中的每条记录,我需要找到另一个具有相同
key
的物理上最接近的(纬度/经度)点。但是......在每个 RDD 中,都有数百百万条记录具有相同的 key
,所以我无法对 key
进行完全连接。任何距离计算都需要限制在其他数据集中的局部点。
这是我今天所做的一个简化版本(为了清楚起见,并且因为 stackoverflow 中没有内存限制)……但我想看看是否有更好的方法。内存更少,更快,更好?
让我们从如下所示的数据开始:
case class Meta(key: Any, lat: Double, lon: Double, meta: Any)
case class Data(key: Any, lat: Double, lon: Double, data: Any, meta: Option[Any] = None) {
def ingestMeta(m: Option[Meta]) = this.copy(meta = m)
}
val rddMeta: RDD[Meta] = ???
val rddData: RDD[Data] = ???
我尝试本地化距离计算的方式很难解释。对于元(较小的数据集),我将每个记录复制 9 倍,并将它们捕捉到元记录的截断纬度/经度处的截断(整数)纬度/经度,并且每个截断纬度/经度上也有 +/- 1 度。
所以...如果真正的元点是红点,并且每个框都是一个以国际纬度/经度为中心的 1 度 x 1 度框,我会在所有 9 个蓝点处复制元。
请参阅对 Meta 类的修改,其中添加了
threeDegreeBox
方法:
case class Meta(key: Any, lat: Double, lon: Double, meta: Any) {
def threeDegreeBox: List[((Any, Int, Int), Meta)] = {
for {
lt <- List(lat.toInt - 1, lat.toInt, lat.toInt + 1)
ln <- List(lon.toInt - 1, lon.toInt, lon.toInt + 1)
} yield ((key, lt, ln), this)
}
// distance calc
def haversine(otherLat: Double, otherLon: Double): Double = ???
}
那么这有什么帮助呢?
首先,我将具有相同键的所有元记录的联接减少为仅数据记录 1 到 2 度内具有相同键的元记录。事实上,我不想要任何哪怕是 1 度的距离。 60公里已经足够了,所以回来的候选人还是太多了,但总比错过一个好。
因此,我在这里收集带有给定键的每个元记录在整数化位置。
// RDD[((key, truncLat, truncLon), Iterable[Meta])]
val rddMetaBoxed: RDD[((Any, Int, Int), Iterable[Meta])] =
rddMeta
.flatMap(_.threeDegreeBox)
.groupByKey
然后我也截断数据中的纬度/经度以加入。由于我已将 Meta 9x 复制到周围位置,因此此连接应将 150 公里左右的范围内的所有内容拉入其中。然后,我只需迭代所有元记录并对每个记录执行距离计算(在实际的、未截断的纬度/经度上)以找到最接近的并摄取元。
// perform the same truncation on rddData to join
val result: RDD[Meta] =
rddData
.keyBy( d => (d.key, d.lat.toInt, d.lon.toInt) )
.leftOuterJoin( rddMetaBoxed ) // maybe inner join and dump non-matches
.map { case ((key, lt, ln), (data, optionIterableMeta)) =>
// use the distance calc to find the closest Meta in the Iterable
val closest: Option[Meta] = ???
data.ingestMeta(closest)
}
它可以工作,但是实际运行它很麻烦,我需要添加一堆黑客的东西来避免内存问题。必须有一种更简单/更智能的方法来做到这一点。有人有什么想法/建议吗?
谢谢。
您可以使用 Geohash 来实现此目的。
它与您的
(Any, Int, Int)
键具有相同的属性,只不过它是一个字符串,具有明确定义的属性。
因此,取一个
RDD[Meta]
并将其转换为 RDD[(Meta, Seq[String])]
flatMap 转换为 RDD[(String, Meta)]
groupBy 将字符串转换为 RDD[(String, Seq[Meta])
并使用连接和过滤器来查找最近的点。
它显然与您想出的非常相似,除了更简单的密钥