我遇到了一个问题,我将 Parquet 数据作为 S3 中的每日块(以
s3://bucketName/prefix/YYYY/MM/DD/
的形式),但我无法从不同日期读取 AWS EMR Spark 中的数据,因为某些列类型不匹配,我得到一个许多例外,例如:
java.lang.ClassCastException: optional binary element (UTF8) is not a group
当某些文件中存在具有值的数组类型,但同一列在其他文件中可能具有 null
值,然后将其推断为字符串类型时,会出现。 或
org.apache.spark.SparkException: Job aborted due to stage failure: Task 23 in stage 42.0 failed 4 times, most recent failure: Lost task 23.3 in stage 42.0 (TID 2189, ip-172-31-9-27.eu-west-1.compute.internal):
org.apache.spark.SparkException: Failed to merge incompatible data types ArrayType(StructType(StructField(Id,LongType,true), StructField(Name,StringType,true), StructField(Type,StringType,true)),true)
我在 S3 中有 JSON 格式的原始数据,我最初的计划是创建一个自动作业,该作业启动一个 EMR 集群,读取前一个日期的 JSON 数据,然后将其作为 parquet 写回 S3。
JSON 数据也分为日期,即键具有日期前缀。读取 JSON 效果很好。无论当前读取多少数据,都会从数据中推断出架构。
但是当写入 parquet 文件时问题就会出现。据我了解,当我使用元数据文件编写镶木地板时,这些文件包含镶木地板文件的所有部分/分区的架构。在我看来,这也可以具有不同的模式。当我禁用写入元数据时,据说 Spark 会从给定 Parquet 路径中的第一个文件推断整个架构,并假设它在其他文件中保持不变。
当某些应为
double
类型的列在给定日期仅具有整数值时,从 JSON(这些数字为整数,没有浮点数)读取它们会使 Spark 认为它是类型为
long
的列。即使我可以在写入 Parquet 文件之前将这些列转换为双倍,但这仍然不好,因为架构可能会更改,可以添加新列,并且跟踪这是不可能的。我看到有些人有同样的问题,但我还没有找到足够好的解决方案。
对此的最佳实践或解决方案是什么?
val PARQUET_OPTIONS = Map(
"spark.sql.parquet.mergeSchema" -> "false",
"spark.sql.parquet.filterPushdown" -> "true")
这可能不是完美的解决方案,但我发现用不断发展的模式解决问题的唯一方法如下:
在每天(更具体地说是每晚)批处理前一天数据的 cron 作业之前,我正在创建一个大部分为空值的虚拟对象。
我确保 ID 是可识别的,例如,由于真实数据具有唯一的 ID,我将“虚拟”字符串作为 ID 添加到虚拟数据对象中。
然后我将为易于出错类型的属性提供预期值,例如我将提供浮点/双精度非零值,因此在编组到 JSON 时,它们肯定会有小数分隔符,例如“0.2”而不是“0” (当编组为 JSON 时,具有 0 值的双精度数/浮点数显示为“0”而不是“0.0”)。
字符串、布尔值和整数工作正常,但除了双精度/浮点之外,我还需要将数组实例化为空数组和其他类/结构的对象以及相应的空对象,这样它们就不会是“null”-s,如 Spark将 null-s 作为字符串读取。
然后我会在 Scala 批处理脚本中使用这些文件来读入它们,将模式保存到变量中,并在读入真实 JSON 数据时将此模式作为参数提供,以避免 Spark 进行自己的模式推断。
这样我就知道所有字段始终具有相同类型,并且只有在添加新字段时才需要加入架构。
当然,它增加了一个缺点,即在添加容易出错类型的新字段时手动更新虚拟对象创建,但这目前是一个小缺点,因为它是我发现的唯一可行的解决方案。
val binary_zip_RDD = sc.binaryFiles(batchHolder.get(i), minPartitions = 50000)
// rdd[String] each string is a json ,lowercased json
val TransformedRDD = binary_zip_RDD.flatMap(kv => ZipDecompressor.Zip_open_hybrid(kv._1, kv._2, proccessingtimestamp))
// now the schema of dataframe would be consolidate schema of all json strings
val jsonDataframe_stream = sparkSession.read.option("primitivesAsString", true).json(TransformedRDD)
println(jsonDataframe_stream.printSchema())
jsonDataframe_stream.write.mode(SaveMode.Append).partitionBy(GetConstantValue.DEVICEDATE).parquet(ApplicationProperties.OUTPUT_DIRECTORY)
https://pypi.org/project/json2spark-mapper/ 来克服这些问题通过 json 文档进行类型推断。