输入Json数据如下。
{
"CarBrands": [
{
"model": "audi",
"make": " (YEAR == \"2009\" AND CONDITION in (\"Y\") AND RESALE in (\"2015\")) ",
"service": {
"first": null,
"second": [],
"third": []
},
"dealerspot": [
{
"dealername": [
"\"first\"",
"\"abc\""
]
},
{
"dealerlat": [
"\"45.00\"",
"\"38.00\""
]
}
],
"type": "ok",
"plate": true
},
{
"model": "bmw",
"make": " (YEAR == \"2010\" AND CONDITION OR (\"N\") AND RESALE in (\"2016\")) ",
"service": {
"first": null,
"second": [],
"third": []
},
"dealerspot": [
{
"dealername": [
"\"sports\"",
"\"abc\""
]
},
{
"dealerlat": [
"\"45.00\"",
"\"38.00\""
]
}
],
"type": "ok",
"plate": true
},
{
"model": "toy",
"make": " (YEAR == \"2013\" AND CONDITION in (\"Y\") AND RESALE in (\"2018\")) ",
"service": {
"first": null,
"second": [],
"third": []
},
"dealerspot": [
{
"dealername": [
"\"nelson\"",
"\"abc\""
]
},
{
"dealerlat": [
"\"45.00\"",
"\"38.00\""
]
}
],
"type": "ok",
"plate": true
}
]
}
我想使用scala和数据帧中的预期输出在Spark中处理此文件
+----- -+-------+-----------+---------------+--------+-----------------+------------+
| model | year | condition | cond_operator | resale | resale_operator |dealername |
+-------+-------+-----------+---------------+--------+-----------------+------------+
|audi | 2009 | Y | in | 2015 | in | first |
|bmw | 2010 | N | OR | 2016 | in | sports |
|toy | 2013 | Y | in | 2018 | in | nelson |
+------ +-------+-----------+---------------+--------+-----------------+------------+
请找到解决方法
package stackoverflow
import utils.Context
import org.apache.spark.sql.functions.explode
import org.apache.spark.sql.functions.split
object JsonSample extends App with Context {
import sparkSession.implicits._
val tagsDF = sparkSession.read
.option("multiLine", true)
.option("inferSchema", true)
.json("src/main/resources/carbrands.json");
val df = tagsDF.select(explode($"CarBrands") as "car_brands")
df.printSchema()
val dfd = df.withColumn("_tmp", split($"car_brands.make", "\"")).select(
$"car_brands.model".as("model"),
$"_tmp".getItem(1).as("year"),
$"_tmp".getItem(2).as("col3"),
$"_tmp".getItem(3).as("condition"),
$"_tmp".getItem(4).as("col5"),
$"_tmp".getItem(5).as("resale"),
$"car_brands.dealerspot.dealername"(0)(0).as("dealer")
).withColumn("_tmp2",split($"col3", " ")).select(
$"model",
$"year",
$"condition",
$"_tmp2".getItem(3).as("cond_operator"),
$"resale",
$"col5",
$"dealer")
.withColumn("_tmp1",split($"col5", " ")).select(
$"model",
$"year",
$"condition",
$"cond_operator",
$"resale",
$"_tmp1".getItem(4).as("resale_operator"),
$"dealer"
)
dfd.show()
}
输出
+-----+----+---------+-------------+------+---------------+--------+
|model|year|condition|cond_operator|resale|resale_operator| dealer|
+-----+----+---------+-------------+------+---------------+--------+
| audi|2009| Y| in| 2015| in| "first"|
| bmw|2010| N| OR| 2016| in|"sports"|
| toy|2013| Y| in| 2018| in|"nelson"|
+-----+----+---------+-------------+------+---------------+--------+