反向重申RDD

问题描述 投票:0回答:2

我有以下类型的纯Scala代码:

import breeze.numerics.log
import spire.random.Dist
import org.apache.commons.math3.distribution.NormalDistribution
import scala.collection.mutable.Buffer


def foo1(zs: Buffer[Double])={
  val S = zs.zip(zs.reverse)
    .map { case (x, y) =>log(x) * log(1 - y) }.sum
  S
}

val x = Dist.uniform(0.0, 1.0).sample[Buffer](10)
val y = x.sortWith(_<_)
val cdf=new NormalDistribution(0, 1)
val z = y.map(x_ => cdf.cumulativeProbability(x_))

foo1(z)

z排序因为cdf正在增加

我想为Spark重写它,但对于RDD数据类型,没有反向方法。我如何为Spark编写此代码?

def foo2(z_rdd: RDD[Double])={
    var S = z_rdd.zip(z_rdd.???)
    .map { case (x, y) =>log(x) * log(1 - y) }.sum
    S
}

其中???function反转z_rdd

scala apache-spark reverse rdd
2个回答
1
投票

如果你试图用自己的反转副本压缩RDD,你应该记住,Spark zip需要两个RDD同等分区:

http://spark.apache.org/docs/latest/api/java/org/apache/spark/rdd/RDD.html#zip-org.apache.spark.rdd.RDD-scala.reflect.ClassTag-

假设两个RDD在每个分区中具有相同数量的分区和相同数量的元素(例如,一个是通过另一个映射制作的)。

因此,完成rdd zip rdd.reversed的方法是:

  1. 如前所述,将zipWithIndex应用于RDD
  2. 以相反的顺序对其进行排序并压缩生成带有索引的RDD
  3. reduceByKeygroupByKey来自步骤1和2的RDD的联合,以索引为关键

我不确定这个食谱是否可以改进。


1
投票

您可以使用zipWithIndex将索引添加到RDD的值,然后按索引反向排序:

z_rdd.zip(
  z_rdd.zipWithIndex()
    .sortBy(_._2, ascending = false)
).map({ case (doubleA, (doubleB, _)) =>
  …
})
© www.soinside.com 2019 - 2024. All rights reserved.