这段代码很好,它只读取列
i
(注意最后一行ReadSchema: struct<i:bigint>
,它只读取i
):
import org.apache.spark.sql.Dataset
// Define the case class
case class Foo(i: Long, j: String)
// Create a Dataset of Foo
val ds: Dataset[Foo] = spark.createDataset(Seq(
Foo(1, "Q"),
Foo(10, "W"),
Foo(100, "E")
))
// Filter and cast the column
val result = ds.filter($"i" === 2).select($"i")
// Explain the query plan
result.explain()
// It prints:
//== Physical Plan ==
//*(1) Filter (isnotnull(i#225L) AND (i#225L = 2))
//+- *(1) ColumnarToRow
// +- FileScan parquet [i#225L] Batched: true, DataFilters: [isnotnull(i#225L), (i#225L = 2)], //Format: Parquet, Location: InMemoryFileIndex(1 paths)[dbfs:/tmp/foo], PartitionFilters: [], //PushedFilters: [IsNotNull(i), EqualTo(i,2)], ReadSchema: struct<i:bigint>
但是,如果我使用
val result = ds.filter(_.i == 10).map(_.i)
,物理计划将读取所有列,包括j
(注意最后一行ReadSchema: struct<i:bigint,j:string>
):
//= Physical Plan ==
//*(1) SerializeFromObject [input[0, bigint, false] AS value#336L]
//+- *(1) MapElements //$line64a700cfcea442ea899a5731e37978a9115.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$Lambda$8811/2079//839768@1028cff, obj#335: bigint
// +- *(1) Filter //$line64a700cfcea442ea899a5731e37978a9115.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$Lambda$8810/7014//[email protected]
// +- *(1) DeserializeToObject newInstance(class //$line64a700cfcea442ea899a5731e37978a925.$read$$iw$$iw$$iw$$iw$$iw$$iw$Foo), obj#334: //$line64a700cfcea442ea899a5731e37978a925.$read$$iw$$iw$$iw$$iw$$iw$$iw$Foo
// +- *(1) ColumnarToRow
// +- FileScan parquet [i#225L,j#226] Batched: true, DataFilters: [], Format: Parquet, //Location: InMemoryFileIndex(1 paths)[dbfs:/tmp/foo], PartitionFilters: [], PushedFilters: [], //ReadSchema: struct<i:bigint,j:string>
当我在过滤器中使用 scala 语法
_.i
时,为什么 Spark 的处理方式不同?
Dataframe API(使用列引用)和Dataset API不仅在描述计划的方式上不同,而且在管理方式上也不同。
数据集转换是使用 lambda 创建的,对于 Spark 优化器来说就像 UDF,因为它无法看到底层发生了什么。如果您确实希望 Spark 优化您的查询,建议仅使用 dataframe API。
_ 强制收集,像rdd一样处理元组。您会看到 ColumnatToRow。