将MapPartitionsRDD转换为DataFrame并通过2个键将数据分组

问题描述 投票:1回答:1

我有一个看起来像这样的数据框:

  country | user | count
  ----------------------
  Germany | Sarah| 2
  China   | Paul | 1
  Germany | Alan | 3
  Germany | Paul | 1
          ...

我正在尝试将这个数据帧转换为另一个看起来像这样的数据:

  dimension | value
  --------------------------------------------
  Country   | [Germany -> 4, China -> 1]
  --------------------------------------------
  User      | [Sarah -> 2, Paul -> 2, Alan -> 3]
          ...

起初,我试图这样做:

  var newDF = Seq.empty[(String, Map[String,Long])].toDF("dimension", "value")
  df.collect()
    .foreach(row => { Array(0,1)
            .map(pos => 
             newDF = newDF.union(Seq((df.columns.toSeq(pos).toString, Map(row.mkString(",").split(",")(pos) -> row.mkString(",").split(",")(2).toLong))).toDF())
             )
     })
  val newDF2 = newDF.groupBy("dimension").agg(collect_list("value")).as[(String, Seq[Map[String, Long]])].map {case (id, list) => (id, list.reduce(_ |+| _))}.toDF("dimension", "value")

但是collect()正在杀死我的驾驶员。因此,我尝试这样做:

 class DimItem[T](val dimension: String, val value: String, val metric: T) 


 val items: RDD[DimItem[Long]] = df.rdd.flatMap(row => {
                                dims.zipWithIndex.map{case (dim, i) => 
                                                  new DimItem(dim, row(i).toString, row(13).asInstanceOf[Long])
                                                  }
                                })  
 // with the format [ DimItem(Country, Germany, 2), DimItem(User, Sarah, 2)], ...

val itemsGrouped: RDD[((String, String), Iterable[DimItem[Long]])] = items.groupBy(x => (x.dimension, x.value))
val aggregatedItems: RDD[DimItem[Long]] = itemsGrouped.map{case (key, items) => new DimItem(key._1, key._2, items.reduce((a,b) => a.metric + b.metric)}

这个想法是在RDD对象中保存(Country,China,1),(Country,Germany,3),(Country,Germany,1),...,然后通过2个第一个键(Country)将其分组,中国),(国家/地区,德国),...分组后,求和。例如:拥有(Country,Germany,3),(Country,Germany,1)会变成(Country,Germany,4)。

但是一旦到达这里,它告诉我items.reduce()中存在不匹配:它期望DimItem [Long]但会得到Long。

下一步将通过键“维度”将其分组,并在“值”列中创建Map[String, Int]()格式,并将其转换为DF。

我有2个问题。

First:这最后的代码正确吗?

第二:如何将该MapPartitionsRDD转换为DF?

scala apache-spark rdd
1个回答
0
投票

这里是一种完全基于数据帧API的解决方案:

© www.soinside.com 2019 - 2024. All rights reserved.