有什么方法可以在RDD中过滤空分区?我有一些分区后的空分区,我不能在动作方法中使用它们。
我在Scala中使用Apache Spark
从你提供的一点信息来看,我可以想到两个选择。使用 mapPartitions
而只是捕获空的迭代器并返回它们,同时处理非空的迭代器。
rdd.mapPartitions { case iter => if(iter.isEmpty) { iter } else { ??? } }
或者你可以使用 repartition
,以摆脱空分区。
rdd.repartition(10) // or any proper number
这是我的样本数据
val sc = spark.sparkContext
val myDataFrame = spark.range(20).toDF("mycol").repartition($"mycol")
myDataFrame.show(false)
产量 :
+-----+
|mycol|
+-----+
|19 |
|0 |
|7 |
|6 |
|9 |
|17 |
|5 |
|1 |
|10 |
|3 |
|12 |
|8 |
|11 |
|2 |
|4 |
|13 |
|18 |
|14 |
|15 |
|16 |
+-----+
在上面的代码中,当你对列进行重新分区时,将创建200个分区,因为 spark.sql.shuffle.partitions = 200
因为数据只有10个数字,所以很多都是未使用或空的分区(我们试图将20个数字装入200个分区,意味着......大部分分区都是空的......) :-))
1) 准备一个长累加器变量,以快速统计非空分区。 2) 将所有非空的分区添加到累加器变量中,就像下面的例子。
val nonEmptyPartitions = sc.longAccumulator("nonEmptyPartitions")
myDataFrame.foreachPartition(partition =>
if (partition.length > 0) nonEmptyPartitions.add(1))
val finalDf = myDataFrame.coalesce(nonEmptyPartitions.value.toInt)
println(s"nonEmptyPart : ${nonEmptyPartitions.value.toInt}")
println(s"df.rdd.partitions.length : ${myDataFrame.rdd.getNumPartitions}")
println(s"finalDf.rdd.partitions.length : ${finalDf.rdd.getNumPartitions}")
打印它们...
打印结果 :
nonEmptyPart : 20
df.rdd.partitions.length : 200
finalDf.rdd.partitions.length : 20
myDataFrame.withColumn("partitionId", org.apache.spark.sql.functions.spark_partition_id)
.groupBy("partitionId")
.count
.show
结果打印出分区的记录数。
+-----------+-----+
|partitionId|count|
+-----------+-----+
|128 |1 |
|190 |1 |
|140 |1 |
|164 |1 |
|5 |1 |
|154 |1 |
|112 |1 |
|107 |1 |
|4 |1 |
|49 |1 |
|69 |1 |
|77 |1 |
|45 |1 |
|121 |1 |
|143 |1 |
|58 |1 |
|11 |1 |
|150 |1 |
|68 |1 |
|116 |1 |
+-----------+-----+
注意事项 :
使用方法 spark_partition_id
是用于demodebug目的,不用于生产目的。
我把200个分区(由于列上的重新分区)减少到20个非空分区。
最后,你摆脱了额外的空分区,它没有任何数据,避免了不必要的调度到空分区上的dummy任务。
如果你不知道列内的不同值,并希望避免有空分区,你可以使用 countApproxDistinct()
为。
df.repartition(df.rdd.countApproxDistinct().toInt)
如果你想过滤现有的空分区并重新分区,你可以使用Sasa提出的解决方案。
或。
df.repartition(df.mapPartitions(part => List(part.length).iterator).collect().count(_ != 0)).df.getNumPartitions)
然而,在后来的情况下,分区可能包含或不包含按价值计算的记录。