Spark Rdd - 使用具有多个列值的sortBy

问题描述 投票:-1回答:1

在对我的数据集进行分组后,它看起来像这样

(AD_PRES,1)
(AD_VP,2)
(FI_ACCOUNT,5)
(FI_MGR,1)
(IT_PROG,5)
(PU_CLERK,5)
(PU_MAN,1)
(SA_MAN,5)
(ST_CLERK,20)
(ST_MAN,5)

在这里,我希望按键排序为降序,值按升序排序。所以尝试下面的代码行。

 emp_data.map(s => (s.JOB_ID, s.FIRST_NAME.concat(",").concat(s.LAST_NAME))).groupByKey().map({
    case (x, y) => (x, y.toList.size)
  }).sortBy(s => (s._1, s._2))(Ordering.Tuple2(Ordering.String.reverse, Ordering.Int.reverse))

它导致以下异常。

not enough arguments for expression of type (implicit ord: Ordering[(String, Int)], implicit ctag: scala.reflect.ClassTag[(String, Int)])org.apache.spark.rdd.RDD[(String, Int)]. Unspecified value parameter ctag.
scala apache-spark rdd
1个回答
3
投票

RDD.sortBy将排序和类标记作为隐式参数。

def sortBy[K](f: (T) ⇒ K, ascending: Boolean = true, numPartitions: Int = this.partitions.length)(implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T] 

你不能只提供这些的一部分,并期望事情发挥作用。相反,您可以提供块本地隐式排序:

{ 
   implicit val ord = Ordering.Tuple2[String, Int](Ordering.String.reverse, Ordering.Int.reverse)
   emp_data.map(s => (s.JOB_ID, s.FIRST_NAME.concat(",").concat(s.LAST_NAME))).groupByKey().map({
     case (x, y) => (x, y.toList.size)
   }).sortBy(s => (s._1, s._2))
}

虽然在这种情况下你真的应该使用reduceByKey而不是groupByKey

© www.soinside.com 2019 - 2024. All rights reserved.