对于给定的JSON响应:
{
"id": "1575972348068_1649088229",
"results": [
{
"rows_count": 53,
"runtime_seconds": 0.004000000189989805,
"columns": [
"ROLE_ID",
"ROLE_NAME"
],
"columns_type": [
"number",
"string"
],
"limit": 2000000000,
"index": 0,
"rows": [
[
"6",
"Incentive Plan Advisor "
],
[
"7",
"Security Admin "
]
],
"command": "<an sql command>"
}
],
"status": "completed"
}
我想在此JSON中获取rows
作为Spark Dataframe。为此,我正在尝试使用以下方法explode
results
条目:
response.show()
val flattened = response.select($"results", explode($"results").as("results_flat1")).select($"results_flat1")
flattened.show()
我收到此回复:
+--------------------+--------------------+---------+
| id| results| status|
+--------------------+--------------------+---------+
|1575972687102_374...|[[[ROLE_ID, ROLE_...|completed|
+--------------------+--------------------+---------+
+--------------------+
| results_flat1|
+--------------------+
|[[ROLE_ID, ROLE_N...|
+--------------------+
[尝试执行explode
时,出现此错误:
flattened.select($"results_flat1", explode($"results_flat1").as("results_flat2"))
Exception in thread "main" org.apache.spark.sql.AnalysisException: cannot resolve 'explode(`results_flat1`)' due to data type mismatch: input to function explode should be array or map type, not struct<columns:array<string>,columns_type:array<string>,command:string,index:bigint,limit:bigint,rows:array<array<string>>,rows_count:bigint,runtime_seconds:double>;;
'Project [results_flat1#91, explode(results_flat1#91) AS results_flat2#99]
+- Project [results_flat1#91]
+- Project [results#75, results_flat1#91]
+- Generate explode(results#75), false, [results_flat1#91]
+- LogicalRDD [id#74, results#75, status#76], false
根据我的分析,可以看到要进行爆炸,我们需要一个字符串或字符串数组才能进行爆炸。为此,我尝试了:
val x = spark.read.json(Seq(flattened.first().get(0).asInstanceOf[String]).toDS())
x.show()
尝试此操作,会出现另一个错误:
Exception in thread "main" java.lang.ClassCastException: org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema cannot be cast to java.lang.String
at org.apache.spark.sql.Row$class.getString(Row.scala:255)
at org.apache.spark.sql.catalyst.expressions.GenericRow.getString(rows.scala:166)
at com.ibm.cmdwcloud.operations.SelectOperations.getRoleListFromEntitlement(SelectOperations.scala:23)
at com.ibm.cmdwcloud.Main$.main(Main.scala:22)
at com.ibm.cmdwcloud.Main.main(Main.scala)
我不知道有什么方法可以帮助我直接获取行对象并将其转换为DataFrame。请对此提供帮助。
编辑:
虽然我能够看到此架构:
root
|-- results_flat1: struct (nullable = true)
| |-- columns: array (nullable = true)
| | |-- element: string (containsNull = true)
| |-- columns_type: array (nullable = true)
| | |-- element: string (containsNull = true)
| |-- command: string (nullable = true)
| |-- index: long (nullable = true)
| |-- limit: long (nullable = true)
| |-- rows: array (nullable = true)
| | |-- element: array (containsNull = true)
| | | |-- element: string (containsNull = true)
| |-- rows_count: long (nullable = true)
| |-- runtime_seconds: double (nullable = true)
但无法对此爆炸。.>
编辑2:
感谢下面的回复,我离我要达到的目标越来越近。我执行了此操作:
val flattened = response.select($"results", explode($"results").as("results_flat1")) .select("results_flat1.*") .select($"rows", explode($"rows").as("rows_flat")) .select($"rows_flat") flattened.show()
并获得此输出:
+--------------------+ | rows_flat| +--------------------+ |[6, Incentive Pla...| |[7, Security Admi...| +--------------------+
现在是否有可能将其爆炸并将其进一步映射到架构,以便获得类似的内容:
+--------------------+--------------------+
| role_id| role_name|
+--------------------+--------------------+
| 6| Incentive Plan Ad..|
| 7| Security Admin|
+--------------------+--------------------+
对于给定的JSON响应:{“ id”:“ 1575972348068_1649088229”,“结果”:[{“ rows_count”:53,“ runtime_seconds”:0.004000000189989805,“ columns” ...