如何在Apache Spark中进行总排序而不遇到OOM?

问题描述 投票:1回答:1

我需要基于一个得分为我的数据帧设置一个等级ID,因为rank_id为我提供了OOM,所以简单的row_number()结束了(按得分排序),因为所有数据都收集到一台机器上。例如

select *, row_number() over (order by score) as rank_id from tbl order by score

monotonically_increasing_id()不会产生我不想要的东西,因为我需要连续的等级ID。在MapReduce中执行相同的操作非常简单,但是我没有找到在Spark中执行此操作的方法,这很好奇...

apache-spark sql-order-by row-number
1个回答
0
投票

因此,考虑到日冕病毒:1)我有一些时间来考虑这个问题,尽管我认为传统的ORACLE DB在这里可能会更好,并且2)我注意到Databricks目前确实很慢。

不管怎样,Spark在每个分区(并行度)上工作,而不是在分区上工作,以提高吞吐量,这就是问题所在。我不确定在MR中会不会更容易,但是如果是这样,请使用它,尽管现在这种情况不太流行。

我做了我自己的事情,并且使用Range Partitioning可以处理等级/密集等级,这意味着对于值范围,相同的值属于1个分区,因此您可以应用排名,然后进行整体排名通过一些智能(即偏移量),依靠范围分区在递增分区中具有递增值。不能完全确定缓存是否很好,但是花了少量的时间就花了一点时间,但我想由于您所知的情况,很多人在室内学习。

而且,这是一个很好的来源:https://www.waitingforcode.com/apache-spark-sql/range-partitioning-apache-spark-sql/read

代码

// Took some short cuts on names of fields, concentrated more on algorithm itself to avoid single partition aspect

import org.apache.spark.sql.functions._
import spark.implicits._
import org.apache.spark.sql.{Encoder, Encoders}
import org.apache.spark.sql.expressions.Window

case class X(id: Long, name: String, amount: Double)

数据:

// Gen example data via DF
val df = Seq(
         (10, "order 1", 2000d), (11, "order 2", 240d), (12, "order 3", 232d), (13, "order 4", 100d), (214, "order 5", 11d), (15, "order 6", 1d),
         (2141, "order 7", 390d), (17, "order 8", 30d), (18, "order 9", 99d), (19, "order 10", 55d),  (20, "order 11", 129d), (21, "order 11", 75d), (15, "order 13", 1d)
        ).toDF("id", "name", "amount")

处理中:

// Make a Dataset, makes it easier with Class in RDD conversion and back again to DF/DS
val ds = df.as[X]

// Num partitions for parallel processing so as to increase throughput, n can be anything, but you will get better throughput
// Range partitioning has all values of same value in same partition - that is the clue here
// dense and non-dense rank possibilities, not just order by row number as in your example
val n = 5
val rdd= ds.repartitionByRange(n, $"amount")
   .rdd 
   .mapPartitionsWithIndex((index, iter) => {
      iter.map(x => (index, x ))   
    })
val df2 = rdd.toDF().cache
df2.createOrReplaceTempView("tab1")
//Get the rakning per range partition
val res1 = spark.sql("""select *, rank() over (partition by _1 order by _2.amount asc) as RANK from tab1 """).cache
//res1.rdd.getNumPartitions // the number of partitions do not change, partitioning rangePartitioning maintained

res1.createOrReplaceTempView("tab2") 
// Get max val per partition, needs caching ideally, write to disk to avoid oddities in recomputation and caching bugs or not or what not. Not always convinced it works.
spark.sql("drop table if exists MAXVALS")
spark.sql(""" create table MAXVALS as select 'dummy' as dummy, _1, max(RANK) as max_rank from tab2 GROUP BY _1 UNION SELECT 'dummy', -1, 0  """)
val resA = spark.table("MAXVALS")
// Get offsets
val resB = resA.withColumn("cum_Max_RANK", sum("max_rank").over(
  Window
    .partitionBy("dummy")
    .orderBy(col("_1")) ))
resB.createOrReplaceTempView("tabC")

//So all the stuff works in parallel, but does it really help??? Is an RDBMS not better then???
val finalResult = spark.sql("""  select tab2._2, (tab2.RANK + tabC.cum_Max_RANK) as OVERALLRANK from tab2, tabc  where tabc._1 = (tab2._1 -1) ORDER BY OVERALLRANK ASC  """)
finalResult.show(false)

结果

+----------------------+-----------+
|_2                    |OVERALLRANK|
+----------------------+-----------+
|[15, order 6, 1.0]    |1          |
|[15, order 13, 1.0]   |1          |
|[214, order 5, 11.0]  |3          |
|[17, order 8, 30.0]   |4          |
|[19, order 10, 55.0]  |5          |
|[21, order 11, 75.0]  |6          |
|[18, order 9, 99.0]   |7          |
|[13, order 4, 100.0]  |8          |
|[20, order 11, 129.0] |9          |
|[12, order 3, 232.0]  |10         |
|[11, order 2, 240.0]  |11         |
|[2141, order 7, 390.0]|12         |
|[10, order 1, 2000.0] |13         |
+----------------------+-----------+

结论

有效。但是,这有什么意义吗?是的,因为完成工作总是比OOM更好。

  • 初始计算可以并行执行以进行订购,但需要repartitionByRange
  • 但是如果您想保证排序顺序(对于收集,显示),则整个最终声明都需要排序。
  • 。explain不会显示单个分区情况,但是必须进行大规模测试。就是说,可以避免使用OOM,并且我怀疑不必要的排序还在继续。可能是sortWithinPartition可能是行人,但我暂时忽略了它。
© www.soinside.com 2019 - 2024. All rights reserved.