Spark:如何将行分组到固定大小的数组中?

问题描述 投票:0回答:2

我有一个如下所示的数据集:

+---+
|col|
+---+
|  a|
|  b|
|  c|
|  d|
|  e|
|  f|
|  g|
+---+

我想重新格式化此数据集,以便将行聚合到固定长度的数组中,如下所示:

+------+
|   col|
+------+
|[a, b]|
|[c, d]|
|[e, f]|
|   [g]|
+------+

我试过这个:

spark.sql("select collect_list(col) from (select col, row_number() over (order by col) row_number from dataset) group by floor(row_number/2)")

但问题是我的实际数据集太大,无法在 row_number() 的单个分区中处理

scala apache-spark apache-spark-sql partitioning
2个回答
0
投票

当您希望分发此内容时,需要执行几个步骤。

如果您希望运行代码,我将从这里开始:

var df = List(
  "a", "b", "c", "d", "e", "f", "g"
).toDF("col")
val desiredArrayLength = 2

首先,将数据帧拆分为一个可以在单个节点上处理的小数据帧,以及一个较大的数据帧,其行数是所需数组大小的倍数(在您的示例中,这是 2)

val nRowsPrune = 1 //number of rows to prune such that remaining dataframe has number of
                   // rows is multiples of the desired length of array
val dfPrune = df.sort(desc("col")).limit(nRowsPrune)
df = df.join(dfPrune,Seq("col"),"left_anti") //separate small from large dataframe

通过构建,您可以将原始代码应用在小型数据框上,

val groupedPruneDf = dfPrune//.withColumn("g",floor((lit(-1)+row_number().over(w))/lit(desiredArrayLength ))) //added -1 as row-number starts from 1
                            //.groupBy("g")
                            .agg( collect_list("col").alias("col"))
                            .select("col")

现在,我们需要找到一种方法来处理剩余的大型数据帧。但是,现在我们确定 df 的行数是数组大小的倍数。 这是我们使用一个很棒的技巧的地方,那就是使用

repartitionByRange
重新分区。基本上,分区保证保留排序,并且在分区时每个分区将具有相同的大小。 您现在可以收集每个分区中的每个数组,

   val nRows = df.count()
   val maxNRowsPartition = desiredArrayLength //make sure its a multiple of desired array length
   val nPartitions = math.max(1,math.floor(nRows/maxNRowsPartition) ).toInt
   df = df.repartitionByRange(nPartitions, $"col".desc)
          .withColumn("partitionId",spark_partition_id())

    val w = Window.partitionBy($"partitionId").orderBy("col")
    val groupedDf = df
        .withColumn("g",  floor( (lit(-1)+row_number().over(w))/lit(desiredArrayLength ))) //added -1 as row-number starts from 1
        .groupBy("partitionId","g")
        .agg( collect_list("col").alias("col"))
        .select("col")

最终将两个结果结合起来就得到了您想要的结果,

val result = groupedDf.union(groupedPruneDf)
result.show(truncate=false)

0
投票

今天遇到同样的问题,对于简单的情况,这可能是一个更干净的解决方案。

输入:

import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window

val partitionCount = 1   // more if can be partition by key to parallelize
var df = List("a", "b", "c", "d", "e", "f", "g")
  .toDF("col")
  .withColumn("row_num", monotonically_increasing_id())  // if order matters
df.show()

结果:

+---+-------+
|col|row_num|
+---+-------+
|  a|      0|
|  b|      1|
|  c|      2|
|  d|      3|
|  e|      4|
|  f|      5|
|  g|      6|
+---+-------+

定义分组和可选附加转换的函数

def processByPatch(iter: Iterator[Row]): Iterator[List[String]] = {
  iter
    .grouped(2)
    .map{grouped =>
      grouped.map{item =>
        item.getString(0)
      }.toList
    }
}

重新分区和mapPartitions:

df
  .repartition(partitionCount, $"col")
  .sortWithinPartitions($"row_num")    // if order matters
  .mapPartitions{iter: Iterator[Row] =>
    processByPatch(iter)
  }
  .show()

结果:

+------+
| value|
+------+
|[a, b]|
|[c, d]|
|[e, f]|
|   [g]|
+------+
© www.soinside.com 2019 - 2024. All rights reserved.