在RDD中过滤空分区

问题描述 投票:0回答:2

有什么方法可以在RDD中过滤空分区?我有一些分区后的空分区,我不能在动作方法中使用它们。

我在Scala中使用Apache Spark

scala apache-spark filtering rdd partition
2个回答
1
投票

从你提供的一点信息来看,我可以想到两个选择。使用 mapPartitions 而只是捕获空的迭代器并返回它们,同时处理非空的迭代器。

rdd.mapPartitions { case iter => if(iter.isEmpty) { iter } else { ??? } }

或者你可以使用 repartition,以摆脱空分区。

rdd.repartition(10) // or any proper number

1
投票

这是我的样本数据

  val sc = spark.sparkContext
  val myDataFrame = spark.range(20).toDF("mycol").repartition($"mycol")
  myDataFrame.show(false)

产量 :

+-----+
|mycol|
+-----+
|19   |
|0    |
|7    |
|6    |
|9    |
|17   |
|5    |
|1    |
|10   |
|3    |
|12   |
|8    |
|11   |
|2    |
|4    |
|13   |
|18   |
|14   |
|15   |
|16   |
+-----+

在上面的代码中,当你对列进行重新分区时,将创建200个分区,因为 spark.sql.shuffle.partitions = 200 因为数据只有10个数字,所以很多都是未使用或空的分区(我们试图将20个数字装入200个分区,意味着......大部分分区都是空的......) :-))

1) 准备一个长累加器变量,以快速统计非空分区。 2) 将所有非空的分区添加到累加器变量中,就像下面的例子。

 val nonEmptyPartitions = sc.longAccumulator("nonEmptyPartitions")
 myDataFrame.foreachPartition(partition =>
    if (partition.length > 0) nonEmptyPartitions.add(1))
  • drop 非空的分区(意思是将它们聚合在一起......少洗牌最小洗牌)。
  • 打印它们。

val finalDf = myDataFrame.coalesce(nonEmptyPartitions.value.toInt)
println(s"nonEmptyPart : ${nonEmptyPartitions.value.toInt}")
println(s"df.rdd.partitions.length :  ${myDataFrame.rdd.getNumPartitions}")
println(s"finalDf.rdd.partitions.length  :  ${finalDf.rdd.getNumPartitions}")

打印它们...

打印结果 :

nonEmptyPart : 20
df.rdd.partitions.length :  200
finalDf.rdd.partitions.length  :  20

证明所有非空的分区都被放弃...

  myDataFrame.withColumn("partitionId", org.apache.spark.sql.functions.spark_partition_id)
.groupBy("partitionId")
.count
.show

结果打印出分区的记录数。

 +-----------+-----+
|partitionId|count|
+-----------+-----+
|128        |1    |
|190        |1    |
|140        |1    |
|164        |1    |
|5          |1    |
|154        |1    |
|112        |1    |
|107        |1    |
|4          |1    |
|49         |1    |
|69         |1    |
|77         |1    |
|45         |1    |
|121        |1    |
|143        |1    |
|58         |1    |
|11         |1    |
|150        |1    |
|68         |1    |
|116        |1    |
+-----------+-----+

注意事项 :

  • 使用方法 spark_partition_id 是用于demodebug目的,不用于生产目的。

  • 我把200个分区(由于列上的重新分区)减少到20个非空分区。

结论。

最后,你摆脱了额外的空分区,它没有任何数据,避免了不必要的调度到空分区上的dummy任务。


0
投票

如果你不知道列内的不同值,并希望避免有空分区,你可以使用 countApproxDistinct() 为。

df.repartition(df.rdd.countApproxDistinct().toInt)

如果你想过滤现有的空分区并重新分区,你可以使用Sasa提出的解决方案。

或。

df.repartition(df.mapPartitions(part => List(part.length).iterator).collect().count(_ != 0)).df.getNumPartitions)

然而,在后来的情况下,分区可能包含或不包含按价值计算的记录。

© www.soinside.com 2019 - 2024. All rights reserved.