我有一个由三列组成的Spark DataFrame:
id | col1 | col2
-----------------
x | p1 | a1
-----------------
x | p2 | b1
-----------------
y | p2 | b2
-----------------
y | p2 | b3
-----------------
y | p3 | c1
应用df.groupBy("id").pivot("col1").agg(collect_list("col2"))
之后,我得到以下数据帧(aggDF):
+---+----+--------+----+
| id| p1| p2| p3|
+---+----+--------+----+
| x|[a1]| [b1]| []|
| y| []|[b2, b3]|[c1]|
+---+----+--------+----+
然后我找到除了id
列之外的其他列的名称。
val cols = aggDF.columns.filter(x => x != "id")
此后,我使用cols.foldLeft(aggDF)((df, x) => df.withColumn(x, when(size(col(x)) > 0, col(x)).otherwise(lit(null))))
将空数组替换为null
。当列数增加时,此代码的性能会变差。另外,我还有字符串列val stringColumns = Array("p1","p3")
的名称。我想获得以下最终数据框:
+---+----+--------+----+
| id| p1| p2| p3|
+---+----+--------+----+
| x| a1 | [b1]|null|
| y|null|[b2, b3]| c1 |
+---+----+--------+----+
为了实现最终数据帧,是否有更好的解决方案?
问题标题有点不正确,这无济于事。
[如果您查看https://medium.com/@manuzhang/the-hidden-cost-of-spark-withcolumn-8ffea517c015,那么您会看到带有foldLeft的withColumn出现性能问题。选择是替代方法,如下所示使用varargs。
不相信collect_list是一个问题。我也保留了第一套逻辑。
UPD:改进的答案,假设id始终是第一列,就像在问题中一样。
import spark.implicits._
import org.apache.spark.sql.functions._
// Your code & assumig id is only col of interest as in THIS question. More elegant than 1st posting.
val df = Seq( ("x","p1","a1"), ("x","p2","b1"), ("y","p2","b2"), ("y","p2","b3"), ("y","p3","c1")).toDF("id", "col1", "col2")
val aggDF = df.groupBy("id").pivot("col1").agg(collect_list("col2"))
//aggDF.show(false)
val colsToSelect = aggDF.columns // All in this case, 1st col id handled by head & tail
val aggDF2 = aggDF.select((col(colsToSelect.head) +: colsToSelect.tail.map(col => when(size(aggDF(col)) === 0,lit(null)).otherwise(aggDF(col)).as(s"$col"))):_*)
aggDF2.show(false)
返回:
+---+----+--------+----+
|id |p1 |p2 |p3 |
+---+----+--------+----+
|x |[a1]|[b1] |null|
|y |null|[b2, b3]|[c1]|
+---+----+--------+----+
也不错,顺便说一句:https://lansalo.com/2018/05/13/spark-how-to-add-multiple-columns-in-dataframes-and-how-not-to/