我有一个RDD的项目,以及一个函数d: (Item, Item) => Double
,它计算两个项目之间的距离。我试图计算从RDD随机抽取的项目之间的平均距离。 RDD相当大(100万),因此计算精确平均值是不可能的。
因此,我想得到一个采样对项目的RDD(我将从中计算距离)。例如,我想得到100米对的样本。给定采样对的RDD,然后我将计算平均值,直方图等以便理解距离分布。
以下是所有失败的初始尝试:
.sample
生成两个RDD,压缩它们并计算项目之间的距离。这失败了,因为.zip
要求两个RDD每个分区具有完全相同的项目数。.cartesian
本身,然后使用.sample
。这失败了(内存不足),因为显然cartesian
并不意味着以这种方式使用。.zip
收集到两个阵列中。这很好但但不能扩展。有任何想法吗?
谢谢!
编辑:这里是如何压缩每个分区具有不同数量的项目的两个样本:
val r = ... // RDD[Item]
val s1 = r.sample(true, 0.1, 123)
val s2 = r.sample(true, 0.1, 456)
val zipper = (i1: Iterator[Item], i2: Iterator[Item]) => i1.zip(i2)
val pairs = r1.zipPartitions(r2)(zipper) // zip the RDDs and explicitly define how to zip the partitions
关键是虽然RDD的.zip方法不接受大小不等的分区,但迭代器的.zip方法会做(并丢弃较长迭代器的剩余部分)。
回答我自己的问题:
.sliding(2)
获得连续的样本对。码:
import org.apache.spark.mllib.rdd.RDDFunctions._ // for .sliding
val x = ... // RDD[Item]
val xSize = x.count
val n = 1000000.0 // (approximate) desired sample size
val pairs = x.sample(true, n/xSize).sliding(2)
val distances = pairs.map(arr => dist(arr(0), arr(1)))