来自 Spark 文档:
由于模式合并是一个相对昂贵的操作,并且在大多数情况下不是必需的,因此我们从 1.5.0 开始默认关闭它。您可以通过在读取 Parquet 文件时将数据源选项 mergeSchema 设置为 true 来启用它(如下例所示),或者将全局 SQL 选项 spark.sql.parquet.mergeSchema 设置为 true。
(https://spark.apache.org/docs/latest/sql-data-sources-parquet.html)
我对文档的理解是,如果我有多个具有不同架构的 parquet 分区,如果我使用
spark.read.option("mergeSchema", "true").parquet(path)
,spark 将能够自动合并这些架构。
如果我在查询时不知道这些分区中存在哪些模式,这似乎是一个不错的选择。
但是,请考虑这样一种情况:我有两个分区,一个使用旧架构,另一个使用新架构,其区别仅在于具有一个附加字段。我们还假设我的代码知道新架构,并且我能够显式传递此架构。
在这种情况下,我会做类似
spark.read.schema(my_new_schema).parquet(path)
的事情。我希望 Spark 在这种情况下所做的就是使用新模式在两个分区中读取数据,并简单地将新列的空值提供给旧分区中的任何行。这是预期的行为吗?或者在这种情况下我还需要使用 option("mergeSchema", "true")
吗?
我希望尽可能避免使用 mergeSchema 选项,以避免文档中提到的额外开销。
我尝试扩展上面链接的 Spark 文档中的示例代码,我的假设似乎是正确的。见下图:
// This is used to implicitly convert an RDD to a DataFrame.
scala> import spark.implicits._
import spark.implicits._
// Create a simple DataFrame, store into a partition directory
scala> val squaresDF = spark.sparkContext.makeRDD(1 to 5).map(i => (i, i * i)).toDF("value", "square")
squaresDF: org.apache.spark.sql.DataFrame = [value: int, square: int]
scala> squaresDF.write.parquet("test_data/test_table/key=1")
// Create another DataFrame in a new partition directory,
// adding a new column and dropping an existing column
scala> val cubesDF = spark.sparkContext.makeRDD(6 to 10).map(i => (i, i * i * i)).toDF("value", "cube")
scala> cubesDF: org.apache.spark.sql.DataFrame = [value: int, cube: int]
scala> cubesDF.write.parquet("test_data/test_table/key=2")
// Read the partitioned table
scala> val mergedDF = spark.read.option("mergeSchema", "true").parquet("test_data/test_table")
mergedDF: org.apache.spark.sql.DataFrame = [value: int, square: int ... 2 more fields]
scala> mergedDF.printSchema()
root
|-- value: integer (nullable = true)
|-- square: integer (nullable = true)
|-- cube: integer (nullable = true)
|-- key: integer (nullable = true)
// Read without mergeSchema option
scala> val naiveDF = spark.read.parquet("test_data/test_table")
naiveDF: org.apache.spark.sql.DataFrame = [value: int, square: int ... 1 more field]
// Note that cube column is missing.
scala> naiveDF.printSchema()
root
|-- value: integer (nullable = true)
|-- square: integer (nullable = true)
|-- key: integer (nullable = true)
// Take the schema from the mergedDF above and use it to read the same table with an explicit schema, but without the "mergeSchema" option.
scala> val explicitSchemaDF = spark.read.schema(mergedDF.schema).parquet("test_data/test_table")
explicitSchemaDF: org.apache.spark.sql.DataFrame = [value: int, square: int ... 2 more fields]
// Spark was able to use the correct schema despite not using the "mergeSchema" option
scala> explicitSchemaDF.printSchema()
root
|-- value: integer (nullable = true)
|-- square: integer (nullable = true)
|-- cube: integer (nullable = true)
|-- key: integer (nullable = true)
// Data is as expected.
scala> explicitSchemaDF.show()
+-----+------+----+---+
|value|square|cube|key|
+-----+------+----+---+
| 3| 9|null| 1|
| 4| 16|null| 1|
| 5| 25|null| 1|
| 8| null| 512| 2|
| 9| null| 729| 2|
| 10| null|1000| 2|
| 1| 1|null| 1|
| 2| 4|null| 1|
| 6| null| 216| 2|
| 7| null| 343| 2|
+-----+------+----+---+
如您所见,当使用显式模式读取数据时,spark 似乎正确地向 parquet 分区中缺少的任何列提供空值。
这让我相当有信心我可以用“不,在这种情况下不需要 mergeSchema 选项”来回答我的问题,但我仍然想知道是否有任何我应该注意的警告。任何其他人的额外帮助将不胜感激。