如果我显式传递模式,我是否需要在带有 parquet 的 Spark 中使用“mergeSchema”选项?

问题描述 投票:0回答:1

来自 Spark 文档:

由于模式合并是一个相对昂贵的操作,并且在大多数情况下不是必需的,因此我们从 1.5.0 开始默认关闭它。您可以通过在读取 Parquet 文件时将数据源选项 mergeSchema 设置为 true 来启用它(如下例所示),或者将全局 SQL 选项 spark.sql.parquet.mergeSchema 设置为 true。

https://spark.apache.org/docs/latest/sql-data-sources-parquet.html

我对文档的理解是,如果我有多个具有不同架构的 parquet 分区,如果我使用

spark.read.option("mergeSchema", "true").parquet(path)
,spark 将能够自动合并这些架构。

如果我在查询时不知道这些分区中存在哪些模式,这似乎是一个不错的选择。

但是,请考虑这样一种情况:我有两个分区,一个使用旧架构,另一个使用新架构,其区别仅在于具有一个附加字段。我们还假设我的代码知道新架构,并且我能够显式传递此架构。

在这种情况下,我会做类似

spark.read.schema(my_new_schema).parquet(path)
的事情。我希望 Spark 在这种情况下所做的就是使用新模式在两个分区中读取数据,并简单地将新列的空值提供给旧分区中的任何行。这是预期的行为吗?或者在这种情况下我还需要使用
option("mergeSchema", "true")
吗?

我希望尽可能避免使用 mergeSchema 选项,以避免文档中提到的额外开销。

apache-spark parquet
1个回答
2
投票

我尝试扩展上面链接的 Spark 文档中的示例代码,我的假设似乎是正确的。见下图:

// This is used to implicitly convert an RDD to a DataFrame.
scala> import spark.implicits._
import spark.implicits._

// Create a simple DataFrame, store into a partition directory
scala> val squaresDF = spark.sparkContext.makeRDD(1 to 5).map(i => (i, i * i)).toDF("value", "square")
squaresDF: org.apache.spark.sql.DataFrame = [value: int, square: int]

scala> squaresDF.write.parquet("test_data/test_table/key=1")

// Create another DataFrame in a new partition directory,
// adding a new column and dropping an existing column
scala> val cubesDF = spark.sparkContext.makeRDD(6 to 10).map(i => (i, i * i * i)).toDF("value", "cube")
scala> cubesDF: org.apache.spark.sql.DataFrame = [value: int, cube: int]

scala> cubesDF.write.parquet("test_data/test_table/key=2")

// Read the partitioned table

scala> val mergedDF = spark.read.option("mergeSchema", "true").parquet("test_data/test_table")
mergedDF: org.apache.spark.sql.DataFrame = [value: int, square: int ... 2 more fields]

scala> mergedDF.printSchema()
root
 |-- value: integer (nullable = true)
 |-- square: integer (nullable = true)
 |-- cube: integer (nullable = true)
 |-- key: integer (nullable = true)


// Read without mergeSchema option
scala> val naiveDF = spark.read.parquet("test_data/test_table")
naiveDF: org.apache.spark.sql.DataFrame = [value: int, square: int ... 1 more field]

// Note that cube column is missing.
scala> naiveDF.printSchema()
root
 |-- value: integer (nullable = true)
 |-- square: integer (nullable = true)
 |-- key: integer (nullable = true)


// Take the schema from the mergedDF above and use it to read the same table with an explicit schema, but without the "mergeSchema" option.
scala> val explicitSchemaDF = spark.read.schema(mergedDF.schema).parquet("test_data/test_table")
explicitSchemaDF: org.apache.spark.sql.DataFrame = [value: int, square: int ... 2 more fields]

// Spark was able to use the correct schema despite not using the "mergeSchema" option
scala> explicitSchemaDF.printSchema()
root
 |-- value: integer (nullable = true)
 |-- square: integer (nullable = true)
 |-- cube: integer (nullable = true)
 |-- key: integer (nullable = true)

// Data is as expected.
scala> explicitSchemaDF.show()
+-----+------+----+---+
|value|square|cube|key|
+-----+------+----+---+
|    3|     9|null|  1|
|    4|    16|null|  1|
|    5|    25|null|  1|
|    8|  null| 512|  2|
|    9|  null| 729|  2|
|   10|  null|1000|  2|
|    1|     1|null|  1|
|    2|     4|null|  1|
|    6|  null| 216|  2|
|    7|  null| 343|  2|
+-----+------+----+---+

如您所见,当使用显式模式读取数据时,spark 似乎正确地向 parquet 分区中缺少的任何列提供空值。

这让我相当有信心我可以用“不,在这种情况下不需要 mergeSchema 选项”来回答我的问题,但我仍然想知道是否有任何我应该注意的警告。任何其他人的额外帮助将不胜感激。

© www.soinside.com 2019 - 2024. All rights reserved.