Scala Spark:键/值结构的扁平数组

问题描述 投票:0回答:2

我有一个输入数据框,其中包含一个数组类型的列。数组中的每个条目都是一个由键(大约四个值之一)和一个值组成的结构。我想将其转换为一个数据框,其中每个可能的键都有一列,如果该值不在该行的数组中,则为null。密钥永远不会在任何数组中重复,但是它们可能乱序或丢失。

到目前为止,我得到的最好的是

val wantedCols =df.columns
  .filter(_ != arrayCol)
  .filter(_ != "col")
val flattened = df
        .select((wantedCols.map(col(_)) ++ Seq(explode(col(arrayCol)))):_*)
        .groupBy(wantedCols.map(col(_)):_*)
        .pivot("col.key")
        .agg(first("col.value"))

这正是我想要的,但是它很可怕,我也不知道在每个列但每个列上分组的后果是什么。什么是正确的方法?

编辑:输入/输出示例:

case class testStruct(name : String, number : String)
val dfExampleInput = Seq(
(0, "KY", Seq(testStruct("A", "45"))),
(1, "OR", Seq(testStruct("A", "30"), testStruct("B", "10"))))
.toDF("index", "state", "entries")
.show

+-----+-----+------------------+
|index|state|           entries|
+-----+-----+------------------+
|    0|   KY|         [[A, 45]]|
|    1|   OR|[[A, 30], [B, 10]]|
+-----+-----+------------------+

val dfExampleOutput = Seq(
  (0, "KY", "45", null),
  (1, "OR", "30", "10"))
  .toDF("index", "state", "A", "B")
  .show

+-----+-----+---+----+
|index|state|  A|   B|
+-----+-----+---+----+
|    0|   KY| 45|null|
|    1|   OR| 30|  10|
+-----+-----+---+----+
scala apache-spark
2个回答
0
投票

我不会担心[[too多按几列分组,除了可能使事情变得混乱之外”。因此,如果有更简单,更可维护的方法,那就去吧。如果没有示例输入/输出,我不确定是否可以将您带到您要去的地方,但是也许会有用:

Seq(Seq("k1" -> "v1", "k2" -> "v2")).toDS() // some basic input based on my understanding of your description .select(explode($"value")) // flatten the array .select("col.*") // de-nest the struct .groupBy("_2") // one row per distinct value .pivot("_1") // one column per distinct key .count // or agg(first) if you want the value in each column .show +---+----+----+ | _2| k1| k2| +---+----+----+ | v2|null| 1| | v1| 1|null| +---+----+----+
根据您现在所说的,我得到的印象是,聚合不需要许多列,例如“ state”,但它们必须位于最终结果中。

作为参考,如果不需要旋转,则可以添加一个结构列,其中嵌套所有此类字段,然后将其添加到聚合中,例如:.agg(first($"myStruct"), first($"number"))。主要优点是仅在groubBy中引用了实际的键列。但是当使用数据透视时,事情会变得有些怪异,因此我们将该选项放在一旁。

在此用例中,我能想到的最简单的方法涉及拆分数据帧,并在使用某些行键聚合之后将其重新连接在一起。在此示例中,我假设"index"适合用于此目的:

val mehCols = dfExampleInput.columns.filter(_ != "entries").map(col) val mehDF = dfExampleInput.select(mehCols:_*) val aggDF = dfExampleInput .select($"index", explode($"entries").as("entry")) .select($"index", $"entry.*") .groupBy("index") .pivot("name") .agg(first($"number")) scala> mehDF.join(aggDF, Seq("index")).show +-----+-----+---+----+ |index|state| A| B| +-----+-----+---+----+ | 0| KY| 45|null| | 1| OR| 30| 10| +-----+-----+---+----+

我怀疑您会发现性能上有很大的不同,如果有的话。也许是极端,例如:meh列很多,或枢轴列很多,或者类似的东西,或者根本没有。就我个人而言,我将使用适当大小的输入进行测试,如果两者之间没有显着差异,请使用似乎更易于维护的任何一种。

0
投票
没有groupBypivotagg first

请检查下面的代码。

scala> val df = Seq((0, "KY", Seq(("A", "45"))),(1, "OR", Seq(("A", "30"),("B", "10")))).toDF("index", "state", "entries").withColumn("entries",$"entries".cast("array<struct<name:string,number:string>>")) df: org.apache.spark.sql.DataFrame = [index: int, state: string ... 1 more field] scala> df.printSchema root |-- index: integer (nullable = false) |-- state: string (nullable = true) |-- entries: array (nullable = true) | |-- element: struct (containsNull = true) | | |-- name: string (nullable = true) | | |-- number: string (nullable = true) scala> df.show(false) +-----+-----+------------------+ |index|state|entries | +-----+-----+------------------+ |0 |KY |[[A, 45]] | |1 |OR |[[A, 30], [B, 10]]| +-----+-----+------------------+ scala> val finalDFColumns = df.select(explode($"entries").as("entries")).select("entries.*").select("name").distinct.map(_.getAs[String](0)).orderBy($"value".asc).collect.foldLeft(df.limit(0))((cdf,c) => cdf.withColumn(c,lit(null))).columns finalDFColumns: Array[String] = Array(index, state, entries, A, B) scala> val finalDF = df.select($"*" +: (0 until max).map(i => $"entries".getItem(i)("number").as(i.toString)): _*) finalDF: org.apache.spark.sql.DataFrame = [index: int, state: string ... 3 more fields] scala> finalDF.show(false) +-----+-----+------------------+---+----+ |index|state|entries |0 |1 | +-----+-----+------------------+---+----+ |0 |KY |[[A, 45]] |45 |null| |1 |OR |[[A, 30], [B, 10]]|30 |10 | +-----+-----+------------------+---+----+ scala> finalDF.printSchema root |-- index: integer (nullable = false) |-- state: string (nullable = true) |-- entries: array (nullable = true) | |-- element: struct (containsNull = true) | | |-- name: string (nullable = true) | | |-- number: string (nullable = true) |-- 0: string (nullable = true) |-- 1: string (nullable = true) scala>

最终输出

scala> finalDF.columns.zip(finalDFColumns).foldLeft(finalDF)((fdf,column) => fdf.withColumnRenamed(column._1,column._2)).show(false) +-----+-----+------------------+---+----+ |index|state|entries |A |B | +-----+-----+------------------+---+----+ |0 |KY |[[A, 45]] |45 |null| |1 |OR |[[A, 30], [B, 10]]|30 |10 | +-----+-----+------------------+---+----+

© www.soinside.com 2019 - 2024. All rights reserved.