首先我有一个salesList: List[Sale]
,为了获得最后的出售在我使用lastOption
列表的ID:
val lastSaleId: Option[Any] = salesList.lastOption.map(_.saleId)
但现在我已经修改与List[Sale]
的方法与salesListRdd: List[RDD[Sale]]
工作。所以我改变了我得到的最后一次拍卖的ID的方法:
val lastSaleId: Option[Any] = SparkContext
.union(salesListRdd)
.collect().toList
.lastOption.map(_.saleId)
我不知道它是最好的一段路要走。因为在这里我还在收集RDD到它带给驾驶员节点列表,它可能会导致驱动程序运行内存不足。
有没有办法让最后的销售从RDD保存记录的初始订单的ID?没有任何一种排序,但销售对象最初被存储在列表的方式?
有至少两个有效的解决方案。您可以使用top
与zipWithIndex
:
def lastValue[T](rdd: RDD[T]): Option[T] = {
rdd.zipWithUniqueId.map(_.swap).top(1)(Ordering[Long].on(_._1)).headOption.map(_._2)
}
或自定义密钥top
:
def lastValue[T](rdd: RDD[T]): Option[T] = {
rdd.mapPartitionsWithIndex(
(i, iter) => iter.zipWithIndex.map { case (x, j) => ((i, j), x) }
).top(1)(Ordering[(Int, Long)].on(_._1)).headOption.map(_._2)
}
前者需要zipWithIndex
额外的操作,而后者没有。
请在使用前一定要了解的局限性。 Quoting the docs:
请注意,某些RDDS,如根据groupby()返回,并不能保证在一个分区要素的顺序。因此分配给每个元件的唯一ID不能保证,并且如果RDD重新评估甚至可能改变。如果一个固定的订货要求,以保证同一指标的分配,你应该用排序sortByKey(在RDD)或将其保存到一个文件中。
特别是,根据确切的输入,Union
可能不会保留输入顺序可言。
您可以通过它使用zipWithIndex
和排序descending
,使最后一条记录将是在顶部,然后采取(1):
salesListRdd
.zipWithIndex()
.map({ case (x, y) => (y, x) })
.sortByKey(ascending = false)
.map({ case (x, y) => y })
.take(1)
解决办法就是从这里取:http://www.swi.com/spark-rdd-getting-bottom-records/然而,这是非常低效的,因为它确实很多分区洗牌。