如何处理 Apache Spark 中不断变化的 parquet 模式

问题描述 投票:0回答:4

我遇到了一个问题,我将 Parquet 数据作为 S3 中的每日块(以

s3://bucketName/prefix/YYYY/MM/DD/
的形式),但我无法从不同日期读取 AWS EMR Spark 中的数据,因为某些列类型不匹配,我得到一个许多例外,例如:

java.lang.ClassCastException: optional binary element (UTF8) is not a group
当某些文件中存在具有值的数组类型,但同一列在其他文件中可能具有

null

 值,然后将其推断为字符串类型时,会出现 

org.apache.spark.SparkException: Job aborted due to stage failure: Task 23 in stage 42.0 failed 4 times, most recent failure: Lost task 23.3 in stage 42.0 (TID 2189, ip-172-31-9-27.eu-west-1.compute.internal): org.apache.spark.SparkException: Failed to merge incompatible data types ArrayType(StructType(StructField(Id,LongType,true), StructField(Name,StringType,true), StructField(Type,StringType,true)),true)

我在 S3 中有 JSON 格式的原始数据,我最初的计划是创建一个自动作业,该作业启动一个 EMR 集群,读取前一个日期的 JSON 数据,然后将其作为 parquet 写回 S3。

JSON 数据也分为日期,即键具有日期前缀。读取 JSON 效果很好。无论当前读取多少数据,都会从数据中推断出架构。

但是当写入 parquet 文件时问题就会出现。据我了解,当我使用元数据文件编写镶木地板时,这些文件包含镶木地板文件的所有部分/分区的架构。在我看来,这也可以具有不同的模式。当我禁用写入元数据时,据说 Spark 会从给定 Parquet 路径中的第一个文件推断整个架构,并假设它在其他文件中保持不变。

当某些应为

double

 类型的列在给定日期仅具有整数值时,从 JSON(这些数字为整数,没有浮点数)读取它们会使 Spark 认为它是类型为 
long 的列
。即使我可以在写入 Parquet 文件之前将这些列转换为双倍,但这仍然不好,因为架构可能会更改,可以添加新列,并且跟踪这是不可能的。 

我看到有些人有同样的问题,但我还没有找到足够好的解决方案。

对此的最佳实践或解决方案是什么?

apache-spark apache-spark-sql parquet amazon-emr
4个回答
9
投票
这些是我用于将 parquet 写入 S3 的选项;关闭模式合并可以提高写回性能 - 它还可以解决您的问题

val PARQUET_OPTIONS = Map( "spark.sql.parquet.mergeSchema" -> "false", "spark.sql.parquet.filterPushdown" -> "true")
    

9
投票
当我从 JSON 中读取每日块中的数据并写入每日 S3 文件夹中的 Parquet 时,在读取 JSON 时未指定自己的架构,或在写入 Parquet 之前将容易出错的列转换为正确的类型,Spark 可能会推断出不同日期的不同架构数据的价值取决于数据实例中的值,并写入具有冲突架构的 Parquet 文件。

这可能不是完美的解决方案,但我发现用不断发展的模式解决问题的唯一方法如下:

在每天(更具体地说是每晚)批处理前一天数据的 cron 作业之前,我正在创建一个大部分为空值的虚拟对象。

我确保 ID 是可识别的,例如,由于真实数据具有唯一的 ID,我将“虚拟”字符串作为 ID 添加到虚拟数据对象中。

然后我将为易于出错类型的属性提供预期值,例如我将提供浮点/双精度非零值,因此在编组到 JSON 时,它们肯定会有小数分隔符,例如“0.2”而不是“0” (当编组为 JSON 时,具有 0 值的双精度数/浮点数显示为“0”而不是“0.0”)。

字符串、布尔值和整数工作正常,但除了双精度/浮点之外,我还需要将数组实例化为空数组和其他类/结构的对象以及相应的空对象,这样它们就不会是“null”-s,如 Spark将 null-s 作为字符串读取。


然后,如果我填写了所有必需的字段,我会将对象编组为 JSON 并将文件写入 S3。

然后我会在 Scala 批处理脚本中使用这些文件来读入它们,将模式保存到变量中,并在读入真实 JSON 数据时将此模式作为参数提供,以避免 Spark 进行自己的模式推断。

这样我就知道所有字段始终具有相同类型,并且只有在添加新字段时才需要加入架构。

当然,它增加了一个缺点,即在添加容易出错类型的新字段时手动更新虚拟对象创建,但这目前是一个小缺点,因为它是我发现的唯一可行的解决方案。


1
投票
只需创建一个 rdd[String],其中每个字符串都是一个 json,当将 rdd 作为数据帧时,使用 PrimitiveAsString 选项将所有数据类型设置为 String

val binary_zip_RDD = sc.binaryFiles(batchHolder.get(i), minPartitions = 50000) // rdd[String] each string is a json ,lowercased json val TransformedRDD = binary_zip_RDD.flatMap(kv => ZipDecompressor.Zip_open_hybrid(kv._1, kv._2, proccessingtimestamp)) // now the schema of dataframe would be consolidate schema of all json strings val jsonDataframe_stream = sparkSession.read.option("primitivesAsString", true).json(TransformedRDD) println(jsonDataframe_stream.printSchema()) jsonDataframe_stream.write.mode(SaveMode.Append).partitionBy(GetConstantValue.DEVICEDATE).parquet(ApplicationProperties.OUTPUT_DIRECTORY)
    

0
投票
也许有点晚了,但如果你的 Json 数据带有模式,并且你正在使用 Python,你可以考虑使用

https://pypi.org/project/json2spark-mapper/ 来克服这些问题通过 json 文档进行类型推断。

© www.soinside.com 2019 - 2024. All rights reserved.