我需要实现以下目标,由于我对Spark缺乏经验,我很难想出一个完成它的方法:
我已经能够成功读取和写入数据,但是在整个任务的几个方面都没有成功:
任何通过整体方法的帮助将不胜感激!
如果输入JSON具有固定模式,则可以手动指定DF模式,将字段声明为可选。请参阅官方guide。如果您在“”中包含所有值,则可以将它们作为字符串读取,然后转换为所需类型。
我不知道如何处理这个问题,以确保我有效...
使用Dataframe API读取输入,很可能默认值对此任务有用。如果遇到性能问题,请附加Spark Job Timeline。
我不知道如何最好地完成数据类型转换......
使用cast column.cast(DataType)
方法。
例如,您有2个JSON:
{“foo”:“firstVal”} {“foo”:“val”,“bar”:“1”}
你想把'foo'读成String并将bar作为整数读出来,你可以这样写:
val schema = StructType(
StructField("foo", StringType, true) ::
StructField("bar", StringType, true) :: Nil
)
val df = session.read
.format("json").option("path", s"${yourPath}")
.schema(schema)
.load()
val withCast = df.select('foo, 'bar cast IntegerType)
withCast.show()
withCast.write.format("parquet").save(s"${outputPath}")