斯卡拉 - 如何选择从RDD的最后一个元素?

问题描述 投票:0回答:2

首先我有一个salesList: List[Sale],为了获得最后的出售在我使用lastOption列表的ID:

val lastSaleId: Option[Any] = salesList.lastOption.map(_.saleId)

但现在我已经修改与List[Sale]的方法与salesListRdd: List[RDD[Sale]]工作。所以我改变了我得到的最后一次拍卖的ID的方法:

  val lastSaleId: Option[Any] = SparkContext
    .union(salesListRdd)
    .collect().toList
    .lastOption.map(_.saleId)

我不知道它是最好的一段路要走。因为在这里我还在收集RDD到它带给驾驶员节点列表,它可能会导致驱动程序运行内存不足。

有没有办法让最后的销售从RDD保存记录的初始订单的ID?没有任何一种排序,但销售对象最初被存储在列表的方式?

scala apache-spark rdd
2个回答
3
投票

有至少两个有效的解决方案。您可以使用topzipWithIndex

def lastValue[T](rdd: RDD[T]): Option[T] = {
  rdd.zipWithUniqueId.map(_.swap).top(1)(Ordering[Long].on(_._1)).headOption.map(_._2)
}

或自定义密钥top

 def lastValue[T](rdd: RDD[T]): Option[T] = {
   rdd.mapPartitionsWithIndex(
     (i, iter) => iter.zipWithIndex.map {  case (x, j) => ((i, j), x) }
   ).top(1)(Ordering[(Int, Long)].on(_._1)).headOption.map(_._2)
 }

前者需要zipWithIndex额外的操作,而后者没有。

请在使用前一定要了解的局限性。 Quoting the docs

请注意,某些RDDS,如根据groupby()返回,并不能保证在一个分区要素的顺序。因此分配给每个元件的唯一ID不能保证,并且如果RDD重新评估甚至可能改变。如果一个固定的订货要求,以保证同一指标的分配,你应该用排序sortByKey(在RDD)或将其保存到一个文件中。

特别是,根据确切的输入,Union可能不会保留输入顺序可言。


1
投票

您可以通过它使用zipWithIndex和排序descending,使最后一条记录将是在顶部,然后采取(1):

salesListRdd
    .zipWithIndex()
    .map({ case (x, y) => (y, x) })
    .sortByKey(ascending = false)
    .map({ case (x, y) => y })
    .take(1)

解决办法就是从这里取:http://www.swi.com/spark-rdd-getting-bottom-records/然而,这是非常低效的,因为它确实很多分区洗牌。

© www.soinside.com 2019 - 2024. All rights reserved.