使用 scala 语法时,Spark 会读取过滤中的所有列

问题描述 投票:0回答:2

这段代码很好,它只读取列

i
(注意最后一行
ReadSchema: struct<i:bigint>
,它只读取
i
):

import org.apache.spark.sql.Dataset

// Define the case class
case class Foo(i: Long, j: String)

// Create a Dataset of Foo
val ds: Dataset[Foo] = spark.createDataset(Seq(
  Foo(1, "Q"),
  Foo(10, "W"),
  Foo(100, "E")
))

// Filter and cast the column
val result = ds.filter($"i" === 2).select($"i")

// Explain the query plan
result.explain()

// It prints:
//== Physical Plan ==
//*(1) Filter (isnotnull(i#225L) AND (i#225L = 2))
//+- *(1) ColumnarToRow
//   +- FileScan parquet [i#225L] Batched: true, DataFilters: [isnotnull(i#225L), (i#225L = 2)], //Format: Parquet, Location: InMemoryFileIndex(1 paths)[dbfs:/tmp/foo], PartitionFilters: [], //PushedFilters: [IsNotNull(i), EqualTo(i,2)], ReadSchema: struct<i:bigint>

但是,如果我使用

val result = ds.filter(_.i == 10).map(_.i)
,物理计划将读取所有列,包括
j
(注意最后一行
ReadSchema: struct<i:bigint,j:string>
):

//= Physical Plan ==
//*(1) SerializeFromObject [input[0, bigint, false] AS value#336L]
//+- *(1) MapElements //$line64a700cfcea442ea899a5731e37978a9115.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$Lambda$8811/2079//839768@1028cff, obj#335: bigint
//   +- *(1) Filter //$line64a700cfcea442ea899a5731e37978a9115.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$Lambda$8810/7014//[email protected]
//      +- *(1) DeserializeToObject newInstance(class //$line64a700cfcea442ea899a5731e37978a925.$read$$iw$$iw$$iw$$iw$$iw$$iw$Foo), obj#334: //$line64a700cfcea442ea899a5731e37978a925.$read$$iw$$iw$$iw$$iw$$iw$$iw$Foo
//         +- *(1) ColumnarToRow
//            +- FileScan parquet [i#225L,j#226] Batched: true, DataFilters: [], Format: Parquet, //Location: InMemoryFileIndex(1 paths)[dbfs:/tmp/foo], PartitionFilters: [], PushedFilters: [], //ReadSchema: struct<i:bigint,j:string>

当我在过滤器中使用 scala 语法

_.i
时,为什么 Spark 的处理方式不同?

apache-spark apache-spark-dataset catalyst-optimizer frameless
2个回答
0
投票

Dataframe API(使用列引用)和Dataset API不仅在描述计划的方式上不同,而且在管理方式上也不同。

数据集转换是使用 lambda 创建的,对于 Spark 优化器来说就像 UDF,因为它无法看到底层发生了什么。如果您确实希望 Spark 优化您的查询,建议仅使用 dataframe API。


0
投票

_ 强制收集,像rdd一样处理元组。您会看到 ColumnatToRow。

© www.soinside.com 2019 - 2024. All rights reserved.