DAG 中的ExternalRDDScan 是什么?

问题描述 投票:0回答:2

DAG中的ExternalRDDScan是什么意思?

整个互联网都没有对此的解释。

apache-spark directed-acyclic-graphs internals
2个回答
8
投票

基于source

ExternalRDDScan
是将任意对象的现有RDD转换为
InternalRow
的数据集的表示,即创建一个
DataFrame
。让我们验证一下我们的理解是否正确:

scala> import spark.implicits._
import spark.implicits._

scala> val rdd = sc.parallelize(Array(1, 2, 3, 4, 5))
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:26

scala> rdd.toDF().explain()
== Physical Plan ==
*(1) SerializeFromObject [input[0, int, false] AS value#2]
+- Scan ExternalRDDScan[obj#1]

4
投票

ExternalRDD 是查询执行计划(即 Spark 创建的 DAG)中 DataFrame/Dataset(但并非在所有情况下)的逻辑表示。

创建了外部RDD

  1. 当您从 RDD 创建 DataFrame 时(即使用 createDataFrame()、toDF()
  2. 当您从 RDD 创建 DataSet 时(即使用 createDataSet()、toDS()

在运行时,当ExternalRDD要加载到内存中时,会进行一次扫描操作,用ExternalRDDScan表示(内部扫描策略解析为ExternalRDDScanExec)。看下面的例子:

scala> val sampleRDD = sc.parallelize(Seq(1,2,3,4,5))
sampleRDD: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24

scala> sampleRDD.toDF.queryExecution
res0: org.apache.spark.sql.execution.QueryExecution =
== Parsed Logical Plan ==
SerializeFromObject [input[0, int, false] AS value#2]
+- ExternalRDD [obj#1]

== Analyzed Logical Plan ==
value: int
SerializeFromObject [input[0, int, false] AS value#2]
+- ExternalRDD [obj#1]

== Optimized Logical Plan ==
SerializeFromObject [input[0, int, false] AS value#2]
+- ExternalRDD [obj#1]

== Physical Plan ==
*(1) SerializeFromObject [input[0, int, false] AS value#2]
+- Scan[obj#1]

可以看到在查询执行计划中,DataFrame对象是 由ExternalRDD表示,物理计划包含一个扫描 解析为ExternalRDDScan(ExternalRDDScanExec)的操作 在其执行期间。

对于 Spark 数据集也是如此。

scala> val sampleRDD = sc.parallelize(Seq(1,2,3,4,5))
sampleRDD: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24

scala> sampleRDD.toDS.queryExecution.logical
res9: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan =
SerializeFromObject [input[0, int, false] AS value#23]
+- ExternalRDD [obj#22]

scala> spark.createDataset(sampleRDD).queryExecution.logical
res18: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan =
SerializeFromObject [input[0, int, false] AS value#39]
+- ExternalRDD [obj#38]

以上示例在spark 2.4.2版本中运行

参考:https://jaceklaskowski.gitbooks.io/mastering-spark-sql/spark-sql-LogicalPlan-ExternalRDD.html

© www.soinside.com 2019 - 2024. All rights reserved.