Spark 是否在同一执行器上执行同一分区?

问题描述 投票:0回答:1

在解释我的真实案例之前,我会问尽可能简单的问题:

如果我使用 HashPartitioner 在相同的数据帧上使用相同的分区键字段和相同的分区编号重新分区 3 次,则分区组成将相同,因为散列键未更改(key_hash %partitions_number 始终给出相同的结果)。

spark 内部是否将相同的分区 id 分配给同一个执行器,或者将分区分配给随机执行器?

例如,在拥有密钥后,如果分区编号未更改,即使我重新分区 3 次,分区 1 将始终具有相同的元素,但数据是否会在执行器周围移动,或者分区 1 是否始终由执行器 1 负责?

真实案例是:

我有:

spark.sql("select key, topic, row_number() over (partition by key,topic order by key) as rn from x").createOrReplaceTempView("y")
//mappartitions operation, producing dataframe z, then ->
spark.sql("select key, collect_list(row_number) from z group by key")

在此示例中,首先按 (key, topic) 分区,然后按 key 重新分区(以实现 group by),可以通过将 按 key 重新分区作为第一个操作来避免这种情况,这将节省一次重新分区操作因为按键分区对这两种操作都有好处,但由于存在 MapPartition Spark 无法知道它,因此它将再次触发重新分区。

我认为这是不可避免的,我的问题是,即使 Spark 再次执行相同的重新分区,仍然值得将重新分区作为顶级操作进行吗?我是否会保存在执行程序中移动的数据,或者即使分区完全相同,数据在任何情况下都会被移动?

这里是查询计划的示例,第 3 行正在执行与第一个操作相同的哈希分区,这是由第 7 行的映射分区使用引起的

 +- Window [row_number() windowspecdefinition(rowkey#817, column_name#815 ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS segment_index#839], [rowkey#817], [column_name#815 ASC NULLS FIRST]
                  +- *(5) Sort [rowkey#817 ASC NULLS FIRST, column_name#815 ASC NULLS FIRST], false, 0
                     +- Exchange hashpartitioning(rowkey#817, 10), ENSURE_REQUIREMENTS, [id=#647]
                        +- *(4) Project [rowkey#817, topic#808, field1#809, field2#810, field3#811, field4#812, field5#813, column_name#815, struct(trid, cast(value#828.rt_trid as string), dataDt, value#828.rt_data_dt, dataDtUtc, value#828.rt_start_date_utc, eventIdx, cast(value#828.rt_event_idx as int), lastEvent, value#828.rt_last_event, recordType, value#828.rt_record_type, latitude, value#828.geo.rt_latitude, longitude, value#828.geo.rt_longitude, altitude, value#828.geo.rt_altitude, country, value#828.address.rt_country, county, value#828.address.rt_county, state, value#828.address.rt_state, ... 58 more fields) AS json_struct#881]
                           +- *(4) Project [cast(rowkey#807 as string) AS rowkey#817, topic#808, field1#809, field2#810, field3#811, field4#812, field5#813, column_name#815, avrodeserializerexpression(value#816, hbase, (hbase.columns.cq,schema), (hbase.namespace,dev), (hbase.columns.name.value,EnrichedJourney), (hbase.columns.namespace.value,com.vodafone.automotive.wasp.telematics.datamodel.journey), (hbase.columns.cf,0), (hbase.table,SCHEMA_REPOSITORY), true) AS value#828]
                              +- *(4) SerializeFromObject [if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 0, rowkey), BinaryType) AS rowkey#807, if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 1, topic), StringType), true, false, true) AS topic#808, if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 2, field1), BinaryType) AS field1#809, if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 3, field2), BinaryType) AS field2#810, if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 4, field3), BinaryType) AS field3#811, if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 5, field4), BinaryType) AS field4#812, if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 6, field5), BinaryType) AS field5#813, if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 8, column_name), StringType), true, false, true) AS column_name#815, if (assertnotnull(input[0, org.apache.spark.sql.Row, true]).isNullAt) null else validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 9, value), BinaryType) AS value#816]
                                 +- MapPartitions ######.hbase.HBaseReader$$$Lambda$3992/510160290@44b639d3, obj#806: org.apache.spark.sql.Row
                                    +- DeserializeToObject createexternalrow(rowkey#787, topic#738.toString, field1#749, field2#751, field3#753, field4#755, field5#757, StructField(rowkey,BinaryType,true), StructField(topic,StringType,true), StructField(field1,BinaryType,true), StructField(field2,BinaryType,true), StructField(field3,BinaryType,true), StructField(field4,BinaryType,true), StructField(field5,BinaryType,true)), obj#805: org.apache.spark.sql.Row
                                       +- *(3) Project [cast(rowkey#737 as binary) AS rowkey#787, topic#738, field1#749, field2#751, field3#753, field4#755, field5#757]
                                          +- *(3) Filter castProcessBooleanUDF(consolidationResult#759)
                                             +- SortAggregate(key=[rowkey#737, topic#738, column_family#739], functions=[max(if ((column_name#740 <=> field1)) value#741 else null), max(if ((column_name#740 <=> field2)) value#741 else null), max(if ((column_name#740 <=> field3)) value#741 else null), max(if ((column_name#740 <=> field4)) value#741 else null), max(if ((column_name#740 <=> field5)) value#741 else null), max(if ((column_name#740 <=> consolidationResult)) value#741 else null)])
                                                +- SortAggregate(key=[rowkey#737, topic#738, column_family#739], functions=[partial_max(if ((column_name#740 <=> field1)) value#741 else null), partial_max(if ((column_name#740 <=> field2)) value#741 else null), partial_max(if ((column_name#740 <=> field3)) value#741 else null), partial_max(if ((column_name#740 <=> field4)) value#741 else null), partial_max(if ((column_name#740 <=> field5)) value#741 else null), partial_max(if ((column_name#740 <=> consolidationResult)) value#741 else null)])
                                                   +- *(2) Sort [rowkey#737 ASC NULLS FIRST, topic#738 ASC NULLS FIRST, column_family#739 ASC NULLS FIRST], false, 0
                                                      +- Exchange hashpartitioning(rowkey#737, 10), REPARTITION_BY_COL, [id=#630]
sql dataframe apache-spark apache-spark-sql
1个回答
0
投票

我用 4 个执行器的 Spark 3 shell 做了一些测试,

import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.functions.{ hash, lit }
import spark.implicits._
def printPlanAndPartitions(df: DataFrame, name: String): Unit = {
  df.rdd.mapPartitionsWithIndex { case (idx, i) =>
    println(s"$name -> partition $idx, values => ${i.toList}")
    i
  }.count()
  sc.parallelize(1 to 4,4).foreachPartition(_ => println("---------------------------------"))
}

val df = sc.parallelize((1 to 20).map(x => x % 4 -> x ), 4).toDF("value", "valuex")
val df2 = df.repartition(4,$"value")
val df2_1 = df2.mapPartitions(r => r)(RowEncoder(df2.schema))
val df3 = df2_1.repartition(4,$"value")
val df3_1 = df3.mapPartitions(r => r)(RowEncoder(df2.schema))
val df4 = df3_1.repartition(4,$"value")
printPlanAndPartitions(df2,"2")
printPlanAndPartitions(df2_1,"2_1")
printPlanAndPartitions(df3,"3")
printPlanAndPartitions(df3_1,"3_1")
printPlanAndPartitions(df4,"4")
sc.parallelize(1 to 4,4).foreachPartition(_ => println("xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx\n\n\n"))

在每个不执行任何操作的mapPartitions之后,会触发哈希重新分区 df4.explain 返回

== Physical Plan ==
Exchange hashpartitioning(value#1587, 4), REPARTITION_BY_NUM, [id=#2579]
+- *(3) SerializeFromObject [validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 0, value), IntegerType) AS value#1587, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 1, valuex), IntegerType) AS valuex#1588]
   +- MapPartitions $line218.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$Lambda$4546/1755336413@56174c86, obj#1586: org.apache.spark.sql.Row
      +- DeserializeToObject createexternalrow(value#1581, valuex#1582, StructField(value,IntegerType,false), StructField(valuex,IntegerType,false)), obj#1585: org.apache.spark.sql.Row
         +- Exchange hashpartitioning(value#1581, 4), REPARTITION_BY_NUM, [id=#2573]
            +- *(2) SerializeFromObject [validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 0, value), IntegerType) AS value#1581, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true]), 1, valuex), IntegerType) AS valuex#1582]
               +- MapPartitions $line216.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$Lambda$4545/2107915335@709c4bb7, obj#1580: org.apache.spark.sql.Row
                  +- DeserializeToObject createexternalrow(value#1563, valuex#1564, StructField(value,IntegerType,false), StructField(valuex,IntegerType,false)), obj#1579: org.apache.spark.sql.Row
                     +- *(1) ColumnarToRow
                        +- InMemoryTableScan [value#1563, valuex#1564]
                              +- InMemoryRelation [value#1563, valuex#1564], StorageLevel(disk, memory, deserialized, 1 replicas)
                                    +- Exchange hashpartitioning(value#1563, 4), REPARTITION_BY_NUM, [id=#2278]
                                       +- *(1) Project [_1#1558 AS value#1563, _2#1559 AS valuex#1564]
                                          +- *(1) SerializeFromObject [knownnotnull(assertnotnull(input[0, scala.Tuple2, true]))._1 AS _1#1558, knownnotnull(assertnotnull(input[0, scala.Tuple2, true]))._2 AS _2#1559]
                                             +- Scan[obj#1557]

Shell 在每个执行器内部打印正在详细说明的分区(从 0 到 3)以及包含的值,此版本值每个分区 id 保持相同,但执行器每次都运行不同的分区。 这是一个执行器输出的示例

2 -> partition 1, values => List()
---------------------------------
2_1 -> partition 3, values => List([3,7], [0,8], [1,9], [3,11], [0,12], [1,13], [3,15], [0,16], [1,17], [3,19], [0,20], [1,1], [3,3], [0,4], [1,5])
---------------------------------
3 -> partition 2, values => List([2,6], [2,10], [2,14], [2,18], [2,2])
---------------------------------
3_1 -> partition 2, values => List([2,6], [2,10], [2,18], [2,14], [2,2])
---------------------------------
4 -> partition 3, values => List([3,7], [0,8], [1,9], [0,16], [1,17], [3,19], [0,20], [3,11], [0,12], [1,13], [3,15], [1,1], [3,3], [0,4], [1,5])
---------------------------------
xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx

但是如果我在第一次重新分区后添加缓存

...
val df2 = df.repartition(4,$"value").cache
...

我得到的结果好坏参半,但似乎有一定的趋势来避免移动数据,这里的结果来自 2 个执行者

2 -> partition 1, values => List()
---------------------------------
2_1 -> partition 1, values => List()
---------------------------------
3 -> partition 1, values => List()
---------------------------------
3_1 -> partition 0, values => List()
---------------------------------
4 -> partition 0, values => List()
---------------------------------
xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx First executor


2 -> partition 2, values => List([2,18], [2,14], [2,2], [2,6], [2,10])
---------------------------------
2_1 -> partition 2, values => List([2,18], [2,14], [2,2], [2,6], [2,10])
---------------------------------
3 -> partition 2, values => List([2,18], [2,14], [2,2], [2,6], [2,10])
---------------------------------
3_1 -> partition 2, values => List([2,18], [2,14], [2,2], [2,6], [2,10])
---------------------------------
4 -> partition 2, values => List([2,18], [2,14], [2,2], [2,6], [2,10])
---------------------------------
xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx Second executor

正如您所看到的,它没有移动已填充的分区 2,但有时它会更改空分区的执行程序,我认为它有某种机制,试图通过调度存储数据的分区来至少减少洗牌。

即使有 8 个分区,也会出现相同的行为

2 -> partition 6, values => List([4,12], [2,18], [4,20], [2,10], [2,2], [4,4])
2 -> partition 0, values => List()
---------------------------------
2_1 -> partition 0, values => List()
2_1 -> partition 6, values => List([4,12], [2,18], [4,20], [2,10], [2,2], [4,4])
---------------------------------
3 -> partition 6, values => List([4,12], [2,18], [4,20], [2,10], [2,2], [4,4])
3 -> partition 5, values => List()
---------------------------------
3_1 -> partition 6, values => List([4,12], [2,18], [4,20], [2,10], [2,2], [4,4])
---------------------------------
4 -> partition 6, values => List([4,12], [2,18], [4,20], [2,10], [2,2], [4,4])
4 -> partition 4, values => List()

我还尝试在第一次重新分区之前进行缓存,但它再次移动数据

...
val df = sc.parallelize((1 to 20).map(x => x % 4 -> x ), 4).toDF("value", "valuex").cache
val df2 = df.repartition(4,$"value")
...

我希望这个分析很有趣,享受

© www.soinside.com 2019 - 2024. All rights reserved.