我有一个看起来像这样的数据框:
country | user | count
----------------------
Germany | Sarah| 2
China | Paul | 1
Germany | Alan | 3
Germany | Paul | 1
...
我正在尝试将这个数据帧转换为另一个看起来像这样的数据:
dimension | value
--------------------------------------------
Country | [Germany -> 4, China -> 1]
--------------------------------------------
User | [Sarah -> 2, Paul -> 2, Alan -> 3]
...
起初,我试图这样做:
var newDF = Seq.empty[(String, Map[String,Long])].toDF("dimension", "value")
df.collect()
.foreach(row => { Array(0,1)
.map(pos =>
newDF = newDF.union(Seq((df.columns.toSeq(pos).toString, Map(row.mkString(",").split(",")(pos) -> row.mkString(",").split(",")(2).toLong))).toDF())
)
})
val newDF2 = newDF.groupBy("dimension").agg(collect_list("value")).as[(String, Seq[Map[String, Long]])].map {case (id, list) => (id, list.reduce(_ |+| _))}.toDF("dimension", "value")
但是collect()
正在杀死我的驾驶员。因此,我尝试这样做:
class DimItem[T](val dimension: String, val value: String, val metric: T)
val items: RDD[DimItem[Long]] = df.rdd.flatMap(row => {
dims.zipWithIndex.map{case (dim, i) =>
new DimItem(dim, row(i).toString, row(13).asInstanceOf[Long])
}
})
// with the format [ DimItem(Country, Germany, 2), DimItem(User, Sarah, 2)], ...
val itemsGrouped: RDD[((String, String), Iterable[DimItem[Long]])] = items.groupBy(x => (x.dimension, x.value))
val aggregatedItems: RDD[DimItem[Long]] = itemsGrouped.map{case (key, items) => new DimItem(key._1, key._2, items.reduce((a,b) => a.metric + b.metric)}
这个想法是在RDD对象中保存(Country,China,1),(Country,Germany,3),(Country,Germany,1),...,然后通过2个第一个键(Country)将其分组,中国),(国家/地区,德国),...分组后,求和。例如:拥有(Country,Germany,3),(Country,Germany,1)会变成(Country,Germany,4)。
但是一旦到达这里,它告诉我items.reduce()
中存在不匹配:它期望DimItem [Long]但会得到Long。
下一步将通过键“维度”将其分组,并在“值”列中创建Map[String, Int]()
格式,并将其转换为DF。
我有2个问题。
First:这最后的代码正确吗?
第二:如何将该MapPartitionsRDD转换为DF?
这里是一种完全基于数据帧API的解决方案: