我有一个输入数据框,其中包含一个数组类型的列。数组中的每个条目都是一个由键(大约四个值之一)和一个值组成的结构。我想将其转换为一个数据框,其中每个可能的键都有一列,如果该值不在该行的数组中,则为null。密钥永远不会在任何数组中重复,但是它们可能乱序或丢失。
到目前为止,我得到的最好的是
val wantedCols =df.columns
.filter(_ != arrayCol)
.filter(_ != "col")
val flattened = df
.select((wantedCols.map(col(_)) ++ Seq(explode(col(arrayCol)))):_*)
.groupBy(wantedCols.map(col(_)):_*)
.pivot("col.key")
.agg(first("col.value"))
这正是我想要的,但是它很可怕,我也不知道在每个列但每个列上分组的后果是什么。什么是正确的方法?
编辑:输入/输出示例:
case class testStruct(name : String, number : String)
val dfExampleInput = Seq(
(0, "KY", Seq(testStruct("A", "45"))),
(1, "OR", Seq(testStruct("A", "30"), testStruct("B", "10"))))
.toDF("index", "state", "entries")
.show
+-----+-----+------------------+
|index|state| entries|
+-----+-----+------------------+
| 0| KY| [[A, 45]]|
| 1| OR|[[A, 30], [B, 10]]|
+-----+-----+------------------+
val dfExampleOutput = Seq(
(0, "KY", "45", null),
(1, "OR", "30", "10"))
.toDF("index", "state", "A", "B")
.show
+-----+-----+---+----+
|index|state| A| B|
+-----+-----+---+----+
| 0| KY| 45|null|
| 1| OR| 30| 10|
+-----+-----+---+----+
我不会担心[[too多按几列分组,除了可能使事情变得混乱之外”。因此,如果有更简单,更可维护的方法,那就去吧。如果没有示例输入/输出,我不确定是否可以将您带到您要去的地方,但是也许会有用:
Seq(Seq("k1" -> "v1", "k2" -> "v2")).toDS() // some basic input based on my understanding of your description
.select(explode($"value")) // flatten the array
.select("col.*") // de-nest the struct
.groupBy("_2") // one row per distinct value
.pivot("_1") // one column per distinct key
.count // or agg(first) if you want the value in each column
.show
+---+----+----+
| _2| k1| k2|
+---+----+----+
| v2|null| 1|
| v1| 1|null|
+---+----+----+
根据您现在所说的,我得到的印象是,聚合不需要许多列,例如“ state”,但它们必须位于最终结果中。作为参考,如果不需要旋转,则可以添加一个结构列,其中嵌套所有此类字段,然后将其添加到聚合中,例如:
.agg(first($"myStruct"), first($"number"))
。主要优点是仅在groubBy
中引用了实际的键列。但是当使用数据透视时,事情会变得有些怪异,因此我们将该选项放在一旁。在此用例中,我能想到的最简单的方法涉及拆分数据帧,并在使用某些行键聚合之后将其重新连接在一起。在此示例中,我假设
"index"
适合用于此目的:
val mehCols = dfExampleInput.columns.filter(_ != "entries").map(col) val mehDF = dfExampleInput.select(mehCols:_*) val aggDF = dfExampleInput .select($"index", explode($"entries").as("entry")) .select($"index", $"entry.*") .groupBy("index") .pivot("name") .agg(first($"number")) scala> mehDF.join(aggDF, Seq("index")).show +-----+-----+---+----+ |index|state| A| B| +-----+-----+---+----+ | 0| KY| 45|null| | 1| OR| 30| 10| +-----+-----+---+----+
我怀疑您会发现性能上有很大的不同,如果有的话。也许是极端,例如:meh
列很多,或枢轴列很多,或者类似的东西,或者根本没有。就我个人而言,我将使用适当大小的输入进行测试,如果两者之间没有显着差异,请使用似乎更易于维护的任何一种。
groupBy
pivot
agg
first
请检查下面的代码。
scala> val df = Seq((0, "KY", Seq(("A", "45"))),(1, "OR", Seq(("A", "30"),("B", "10")))).toDF("index", "state", "entries").withColumn("entries",$"entries".cast("array<struct<name:string,number:string>>"))
df: org.apache.spark.sql.DataFrame = [index: int, state: string ... 1 more field]
scala> df.printSchema
root
|-- index: integer (nullable = false)
|-- state: string (nullable = true)
|-- entries: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- name: string (nullable = true)
| | |-- number: string (nullable = true)
scala> df.show(false)
+-----+-----+------------------+
|index|state|entries |
+-----+-----+------------------+
|0 |KY |[[A, 45]] |
|1 |OR |[[A, 30], [B, 10]]|
+-----+-----+------------------+
scala> val finalDFColumns = df.select(explode($"entries").as("entries")).select("entries.*").select("name").distinct.map(_.getAs[String](0)).orderBy($"value".asc).collect.foldLeft(df.limit(0))((cdf,c) => cdf.withColumn(c,lit(null))).columns
finalDFColumns: Array[String] = Array(index, state, entries, A, B)
scala> val finalDF = df.select($"*" +: (0 until max).map(i => $"entries".getItem(i)("number").as(i.toString)): _*)
finalDF: org.apache.spark.sql.DataFrame = [index: int, state: string ... 3 more fields]
scala> finalDF.show(false)
+-----+-----+------------------+---+----+
|index|state|entries |0 |1 |
+-----+-----+------------------+---+----+
|0 |KY |[[A, 45]] |45 |null|
|1 |OR |[[A, 30], [B, 10]]|30 |10 |
+-----+-----+------------------+---+----+
scala> finalDF.printSchema
root
|-- index: integer (nullable = false)
|-- state: string (nullable = true)
|-- entries: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- name: string (nullable = true)
| | |-- number: string (nullable = true)
|-- 0: string (nullable = true)
|-- 1: string (nullable = true)
scala>
最终输出
scala> finalDF.columns.zip(finalDFColumns).foldLeft(finalDF)((fdf,column) => fdf.withColumnRenamed(column._1,column._2)).show(false) +-----+-----+------------------+---+----+ |index|state|entries |A |B | +-----+-----+------------------+---+----+ |0 |KY |[[A, 45]] |45 |null| |1 |OR |[[A, 30], [B, 10]]|30 |10 | +-----+-----+------------------+---+----+