我有一个如下所示的数据集:
+---+
|col|
+---+
| a|
| b|
| c|
| d|
| e|
| f|
| g|
+---+
我想重新格式化此数据集,以便将行聚合到固定长度的数组中,如下所示:
+------+
| col|
+------+
|[a, b]|
|[c, d]|
|[e, f]|
| [g]|
+------+
我试过这个:
spark.sql("select collect_list(col) from (select col, row_number() over (order by col) row_number from dataset) group by floor(row_number/2)")
但问题是我的实际数据集太大,无法在 row_number() 的单个分区中处理
当您希望分发此内容时,需要执行几个步骤。
如果您希望运行代码,我将从这里开始:
var df = List(
"a", "b", "c", "d", "e", "f", "g"
).toDF("col")
val desiredArrayLength = 2
首先,将数据帧拆分为一个可以在单个节点上处理的小数据帧,以及一个较大的数据帧,其行数是所需数组大小的倍数(在您的示例中,这是 2)
val nRowsPrune = 1 //number of rows to prune such that remaining dataframe has number of
// rows is multiples of the desired length of array
val dfPrune = df.sort(desc("col")).limit(nRowsPrune)
df = df.join(dfPrune,Seq("col"),"left_anti") //separate small from large dataframe
通过构建,您可以将原始代码应用在小型数据框上,
val groupedPruneDf = dfPrune//.withColumn("g",floor((lit(-1)+row_number().over(w))/lit(desiredArrayLength ))) //added -1 as row-number starts from 1
//.groupBy("g")
.agg( collect_list("col").alias("col"))
.select("col")
现在,我们需要找到一种方法来处理剩余的大型数据帧。但是,现在我们确定 df 的行数是数组大小的倍数。 这是我们使用一个很棒的技巧的地方,那就是使用
repartitionByRange
重新分区。基本上,分区保证保留排序,并且在分区时每个分区将具有相同的大小。
您现在可以收集每个分区中的每个数组,
val nRows = df.count()
val maxNRowsPartition = desiredArrayLength //make sure its a multiple of desired array length
val nPartitions = math.max(1,math.floor(nRows/maxNRowsPartition) ).toInt
df = df.repartitionByRange(nPartitions, $"col".desc)
.withColumn("partitionId",spark_partition_id())
val w = Window.partitionBy($"partitionId").orderBy("col")
val groupedDf = df
.withColumn("g", floor( (lit(-1)+row_number().over(w))/lit(desiredArrayLength ))) //added -1 as row-number starts from 1
.groupBy("partitionId","g")
.agg( collect_list("col").alias("col"))
.select("col")
最终将两个结果结合起来就得到了您想要的结果,
val result = groupedDf.union(groupedPruneDf)
result.show(truncate=false)
今天遇到同样的问题,对于简单的情况,这可能是一个更干净的解决方案。
输入:
import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window
val partitionCount = 1 // more if can be partition by key to parallelize
var df = List("a", "b", "c", "d", "e", "f", "g")
.toDF("col")
.withColumn("row_num", monotonically_increasing_id()) // if order matters
df.show()
结果:
+---+-------+
|col|row_num|
+---+-------+
| a| 0|
| b| 1|
| c| 2|
| d| 3|
| e| 4|
| f| 5|
| g| 6|
+---+-------+
定义分组和可选附加转换的函数
def processByPatch(iter: Iterator[Row]): Iterator[List[String]] = {
iter
.grouped(2)
.map{grouped =>
grouped.map{item =>
item.getString(0)
}.toList
}
}
重新分区和mapPartitions:
df
.repartition(partitionCount, $"col")
.sortWithinPartitions($"row_num") // if order matters
.mapPartitions{iter: Iterator[Row] =>
processByPatch(iter)
}
.show()
结果:
+------+
| value|
+------+
|[a, b]|
|[c, d]|
|[e, f]|
| [g]|
+------+