在Amazon EMR中将JSON转换为Parquet

问题描述 投票:3回答:1

我需要实现以下目标,由于我对Spark缺乏经验,我很难想出一个完成它的方法:

  • 从存储在S3中的.json.gz文件中读取数据。 每个文件都包含部分日期的Google Analytics数据,其中包含https://support.google.com/analytics/answer/3437719?hl=en中指定的架构。 文件名采用模式ga_sessions_20170101_Part000000000000_TX.json.gz,其中20170101是YYYYMMDD日期规范,000000000000是一天内有多个文件时的增量计数器(通常是这种情况)。 因此,整天的数据由具有增量“部件号”的多个文件组成。 每天通常有3到5个文件。 无论上述模式文档中指定的数据类型如何,JSON文件中的所有字段都使用qoute(“)分隔符存储。因此,通过读取文件(通过sqlContext.read.json)生成的数据框将每个字段都键入为字符串,即使有些实际上是整数,布尔或其他数据类型。
  • 根据模式规范将全字符串数据帧转换为正确类型的数据帧。 我的目标是正确输入数据框,这样当它以Parquet格式保存时,数据类型是正确的。 并非模式规范中的所有字段都存在于每个输入文件中,甚至不是每天都有输入文件(模式可能随着时间的推移而发生变化)。因此,转换需要是动态的,只转换数据帧中实际存在的字段的数据类型。
  • 将正确键入的数据帧中的数据以Parquet格式写回S3。 数据应按日分区,每个分区存储在名为“partition_date = YYYYMMDD”的单独文件夹中,其中“YYYYMMDD”是与数据关联的实际日期(来自原始输入文件名)。 我不认为每天的文件数量很重要。目标只是分区Parquet格式数据,我可以指向Spectrum。

我已经能够成功读取和写入数据,但是在整个任务的几个方面都没有成功:

  • 我不知道如何解决这个问题,以确保我有效地利用AWS EMR集群充分发挥并行/分布式处理的潜力,无论是在读取,转换还是写入数据。我想根据需要调整集群的大小,以便在我选择的任何时间范围内完成任务(在合理范围内)。
  • 我不知道如何最好地完成数据类型转换。不知道在任何特定批输入文件中将存在或不存在哪些字段需要动态代码来重新键入数据帧。我还想确保这个任务有效地分发并且没有低效完成(我担心在重新输入每个字段时创建一个新的数据框)。
  • 我不明白如何适当地管理数据的分区。

任何通过整体方法的帮助将不胜感激!

apache-spark emr
1个回答
0
投票

如果输入JSON具有固定模式,则可以手动指定DF模式,将字段声明为可选。请参阅官方guide。如果您在“”中包含所有值,则可以将它们作为字符串读取,然后转换为所需类型。

我不知道如何处理这个问题,以确保我有效...

使用Dataframe API读取输入,很可能默认值对此任务有用。如果遇到性能问题,请附加Spark Job Timeline。

我不知道如何最好地完成数据类型转换......

使用cast column.cast(DataType)方法。

例如,您有2个JSON:

{“foo”:“firstVal”} {“foo”:“val”,“bar”:“1”}

你想把'foo'读成String并将bar作为整数读出来,你可以这样写:

  val schema = StructType(
    StructField("foo", StringType, true) ::
      StructField("bar", StringType, true) :: Nil
  )
  val df = session.read
    .format("json").option("path", s"${yourPath}")
    .schema(schema)
    .load()

  val withCast = df.select('foo, 'bar cast IntegerType)
  withCast.show()
  withCast.write.format("parquet").save(s"${outputPath}")
© www.soinside.com 2019 - 2024. All rights reserved.