Apache spark:对的RDD示例

问题描述 投票:0回答:1

我有一个RDD的项目,以及一个函数d: (Item, Item) => Double,它计算两个项目之间的距离。我试图计算从RDD随机抽取的项目之间的平均距离。 RDD相当大(100万),因此计算精确平均值是不可能的。

因此,我想得到一个采样对项目的RDD(我将从中计算距离)。例如,我想得到100米对的样本。给定采样对的RDD,然后我将计算平均值,直方图等以便理解距离分布。

以下是所有失败的初始尝试:

  1. 使用.sample生成两个RDD,压缩它们并计算项目之间的距离。这失败了,因为.zip要求两个RDD每个分区具有完全相同的项目数。
  2. 使用RDD的.cartesian本身,然后使用.sample。这失败了(内存不足),因为显然cartesian并不意味着以这种方式使用。
  3. 收集RDD的两个小样本,并将.zip收集到两个阵列中。这很好但但不能扩展。

有任何想法吗?

谢谢!


编辑:这里是如何压缩每个分区具有不同数量的项目的两个样本:

val r = ... // RDD[Item]
val s1 = r.sample(true, 0.1, 123)
val s2 = r.sample(true, 0.1, 456)
val zipper = (i1: Iterator[Item], i2: Iterator[Item]) => i1.zip(i2)
val pairs = r1.zipPartitions(r2)(zipper) // zip the RDDs and explicitly define how to zip the partitions

关键是虽然RDD的.zip方法不接受大小不等的分区,但迭代器的.zip方法会做(并丢弃较长迭代器的剩余部分)。

apache-spark random rdd
1个回答
1
投票

回答我自己的问题:

  1. 获取rdd的样本(替换),
  2. 使用.sliding(2)获得连续的样本对。

码:

import org.apache.spark.mllib.rdd.RDDFunctions._ // for .sliding
val x = ... // RDD[Item]
val xSize = x.count
val n = 1000000.0 // (approximate) desired sample size
val pairs = x.sample(true, n/xSize).sliding(2)
val distances = pairs.map(arr => dist(arr(0), arr(1)))
© www.soinside.com 2019 - 2024. All rights reserved.